You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 06:19:57 UTC
[flink-table-store] branch master updated: [FLINK-28118] Introduce TableStoreOptions to merge all options
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 80b69f77 [FLINK-28118] Introduce TableStoreOptions to merge all options
80b69f77 is described below
commit 80b69f77af418da669bbeb70ee7e7320aafdb945
Author: liliwei <hi...@gmail.com>
AuthorDate: Tue Jul 5 14:19:51 2022 +0800
[FLINK-28118] Introduce TableStoreOptions to merge all options
This closes #196
---
.../store/connector/TableStoreManagedFactory.java | 18 +-
.../store/connector/AbstractTableStoreFactory.java | 28 +-
.../flink/table/store/connector/FlinkCatalog.java | 2 +-
.../store/connector/sink/FlinkSinkBuilder.java | 4 +-
.../table/store/connector/sink/TableStoreSink.java | 8 +-
.../store/connector/source/FlinkSourceBuilder.java | 18 +-
.../store/connector/source/TableStoreSource.java | 12 +-
.../ComputedColumnAndWatermarkTableITCase.java | 20 +-
.../table/store/connector/CreateTableITCase.java | 12 +-
.../table/store/connector/DropTableITCase.java | 6 +-
.../table/store/connector/FileStoreITCase.java | 10 +-
.../store/connector/FileStoreTableITCase.java | 4 +-
.../store/connector/ReadWriteTableTestBase.java | 16 +-
.../table/store/connector/RescaleBucketITCase.java | 2 +-
.../connector/TableStoreManagedFactoryTest.java | 29 +-
.../table/store/connector/TableStoreTestBase.java | 8 +-
.../store/connector/sink/LogStoreSinkITCase.java | 8 +-
.../source/TestChangelogDataReadWrite.java | 7 +-
.../org/apache/flink/table/store/CoreOptions.java | 582 +++++++++++++++++++++
.../flink/table/store/file/AbstractFileStore.java | 7 +-
.../table/store/file/AppendOnlyFileStore.java | 5 +-
.../flink/table/store/file/KeyValueFileStore.java | 5 +-
.../file/operation/KeyValueFileStoreWrite.java | 6 +-
.../store/file/utils/FileStorePathFactory.java | 4 +-
.../table/store/log/LogStoreTableFactory.java | 18 +-
.../store/table/AppendOnlyFileStoreTable.java | 4 +-
.../table/ChangelogValueCountFileStoreTable.java | 4 +-
.../table/ChangelogWithKeyFileStoreTable.java | 6 +-
.../table/store/table/FileStoreTableFactory.java | 8 +-
.../table/store/table/sink/MemoryTableWrite.java | 11 +-
.../flink/table/store/file/FileFormatTest.java | 5 +-
.../flink/table/store/file/TestFileStore.java | 22 +-
.../store/file/data/AppendOnlyWriterTest.java | 4 +-
.../store/file/data/DataFilePathFactoryTest.java | 10 +-
.../store/file/manifest/ManifestFileMetaTest.java | 4 +-
.../store/file/manifest/ManifestFileTest.java | 4 +-
.../store/file/manifest/ManifestListTest.java | 4 +-
.../table/store/file/mergetree/MergeTreeTest.java | 11 +-
.../store/file/utils/FileStorePathFactoryTest.java | 4 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 8 +-
.../ChangelogValueCountFileStoreTableTest.java | 8 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 11 +-
.../table/store/table/WritePreemptMemoryTest.java | 13 +-
.../table/store/mapred/TableStoreInputFormat.java | 4 +-
.../flink/table/store/FileStoreTestUtils.java | 5 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 26 +-
.../store/mapred/TableStoreRecordReaderTest.java | 10 +-
.../store/kafka/KafkaLogSerializationSchema.java | 2 +-
.../table/store/kafka/KafkaLogSinkProvider.java | 4 +-
.../table/store/kafka/KafkaLogSourceProvider.java | 4 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 44 +-
.../flink/table/store/kafka/KafkaLogITCase.java | 4 +-
.../store/kafka/KafkaLogSerializationTest.java | 2 +-
.../flink/table/store/kafka/KafkaLogTestUtils.java | 12 +-
.../table/store/spark/SimpleTableTestHelper.java | 4 +-
55 files changed, 838 insertions(+), 263 deletions(-)
diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 9d326646..431e5c9b 100644
--- a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.ManagedTableFactory;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
@@ -38,15 +38,15 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.WRITE_MODE;
import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
/** Default implementation of {@link ManagedTableFactory}. */
public class TableStoreManagedFactory extends AbstractTableStoreFactory
@@ -103,7 +103,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
@Override
public void onCreateTable(Context context, boolean ignoreIfExists) {
Map<String, String> options = context.getCatalogTable().getOptions();
- Path path = FileStoreOptions.path(options);
+ Path path = CoreOptions.path(options);
try {
if (path.getFileSystem().exists(path) && !ignoreIfExists) {
throw new TableException(
@@ -161,7 +161,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
@Override
public void onDropTable(Context context, boolean ignoreIfNotExists) {
- Path path = FileStoreOptions.path(context.getCatalogTable().getOptions());
+ Path path = CoreOptions.path(context.getCatalogTable().getOptions());
try {
if (path.getFileSystem().exists(path)) {
path.getFileSystem().delete(path, true);
@@ -186,7 +186,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions());
- if (APPEND_ONLY.toString().equals(newOptions.get(FileStoreOptions.WRITE_MODE.key()))) {
+ if (APPEND_ONLY.toString().equals(newOptions.get(CoreOptions.WRITE_MODE.key()))) {
throw new UnsupportedOperationException(
"ALTER TABLE COMPACT is not yet supported for append only table.");
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 8f6659c3..54de82ff 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -27,13 +27,11 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.connector.source.TableStoreSource;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
@@ -47,11 +45,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Abstract table store factory to create table source and table sink. */
@@ -85,8 +83,8 @@ public abstract class AbstractTableStoreFactory
@Override
public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
- options.addAll(MergeTreeOptions.allOptions());
+ Set<ConfigOption<?>> options = CoreOptions.allOptions();
+ options.addAll(CoreOptions.allOptions());
options.addAll(TableStoreFactoryOptions.allOptions());
return options;
}
@@ -115,18 +113,18 @@ public abstract class AbstractTableStoreFactory
private static void validateFileStoreContinuous(Configuration options) {
Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
- LogOptions.LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
- if (changelogMode == LogOptions.LogChangelogMode.UPSERT) {
+ CoreOptions.LogChangelogMode changelogMode = logOptions.get(LOG_CHANGELOG_MODE);
+ if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
throw new ValidationException(
"File store continuous reading dose not support upsert changelog mode.");
}
- LogOptions.LogConsistency consistency = logOptions.get(CONSISTENCY);
- if (consistency == LogOptions.LogConsistency.EVENTUAL) {
+ CoreOptions.LogConsistency consistency = logOptions.get(LOG_CONSISTENCY);
+ if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
throw new ValidationException(
"File store continuous reading dose not support eventual consistency mode.");
}
- LogOptions.LogStartupMode startupMode = logOptions.get(SCAN);
- if (startupMode == LogOptions.LogStartupMode.FROM_TIMESTAMP) {
+ CoreOptions.LogStartupMode startupMode = logOptions.get(LOG_SCAN);
+ if (startupMode == CoreOptions.LogStartupMode.FROM_TIMESTAMP) {
throw new ValidationException(
"File store continuous reading dose not support from_timestamp scan mode, "
+ "you can add timestamp filters instead.");
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 6077e57d..11d79dd4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -52,8 +52,8 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.store.CoreOptions.PATH;
import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
/** Catalog for table store. */
public class FlinkCatalog extends AbstractCatalog {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 91141ece..d0d9cccb 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -24,8 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -89,7 +89,7 @@ public class FlinkSinkBuilder {
}
public DataStreamSink<?> build() {
- int numBucket = conf.get(FileStoreOptions.BUCKET);
+ int numBucket = conf.get(CoreOptions.BUCKET);
BucketStreamPartitioner partitioner =
new BucketStreamPartitioner(numBucket, table.schema());
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 81ad0dba..0f4e5dac 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -28,9 +28,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -78,8 +78,10 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
Configuration logOptions =
new DelegatingConfiguration(
- Configuration.fromMap(table.schema().options()), LogOptions.LOG_PREFIX);
- if (logOptions.get(LogOptions.CHANGELOG_MODE) != LogOptions.LogChangelogMode.ALL) {
+ Configuration.fromMap(table.schema().options()),
+ CoreOptions.LOG_PREFIX);
+ if (logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
+ != CoreOptions.LogChangelogMode.ALL) {
// with primary key, default sink upsert
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index ebeb5482..396f6d94 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -30,10 +30,9 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.utils.Projection;
@@ -95,7 +94,7 @@ public class FlinkSourceBuilder {
}
private long discoveryIntervalMills() {
- return conf.get(FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+ return conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
}
private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) {
@@ -112,18 +111,19 @@ public class FlinkSourceBuilder {
if (isContinuous) {
// TODO move validation to a dedicated method
if (table.schema().primaryKeys().size() > 0
- && conf.get(FileStoreOptions.MERGE_ENGINE)
- == FileStoreOptions.MergeEngine.PARTIAL_UPDATE) {
+ && conf.get(CoreOptions.MERGE_ENGINE)
+ == CoreOptions.MergeEngine.PARTIAL_UPDATE) {
throw new ValidationException(
"Partial update continuous reading is not supported.");
}
- LogOptions.LogStartupMode startupMode =
- new DelegatingConfiguration(conf, LogOptions.LOG_PREFIX).get(LogOptions.SCAN);
+ CoreOptions.LogStartupMode startupMode =
+ new DelegatingConfiguration(conf, CoreOptions.LOG_PREFIX)
+ .get(CoreOptions.LOG_SCAN);
if (logSourceProvider == null) {
- return buildFileSource(true, startupMode == LogOptions.LogStartupMode.LATEST);
+ return buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST);
} else {
- if (startupMode != LogOptions.LogStartupMode.FULL) {
+ if (startupMode != CoreOptions.LogStartupMode.FULL) {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 5d03ee02..880c6a9c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -28,12 +28,12 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -94,10 +94,12 @@ public class TableStoreSource
// normalized nodes. See TableStoreSink.getChangelogMode validation.
Configuration logOptions =
new DelegatingConfiguration(
- Configuration.fromMap(table.schema().options()), LogOptions.LOG_PREFIX);
- return logOptions.get(LogOptions.CONSISTENCY) == LogOptions.LogConsistency.TRANSACTIONAL
- && logOptions.get(LogOptions.CHANGELOG_MODE)
- == LogOptions.LogChangelogMode.ALL
+ Configuration.fromMap(table.schema().options()),
+ CoreOptions.LOG_PREFIX);
+ return logOptions.get(CoreOptions.LOG_CONSISTENCY)
+ == CoreOptions.LogConsistency.TRANSACTIONAL
+ && logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
+ == CoreOptions.LogChangelogMode.ALL
? ChangelogMode.all()
: ChangelogMode.upsert();
} else {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index a7fe99b3..be277d35 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.junit.Test;
@@ -29,9 +29,9 @@ import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
/** Table store IT case when the managed table has computed column and watermark spec. */
public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBase {
@@ -146,8 +146,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
null,
false,
Collections.singletonMap(
- LOG_PREFIX + LogOptions.SCAN.key(),
- LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+ CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
"rate_by_to_currency IS NULL",
Arrays.asList(
"corrected_rate_by_to_currency",
@@ -191,8 +191,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + LogOptions.SCAN.key(),
- LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+ CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Collections.emptyList(), // projection
Collections.singletonList(
@@ -215,8 +215,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + LogOptions.SCAN.key(),
- LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+ CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter.replaceAll("ts", "ts1"),
Arrays.asList("currency", "rate", "ts1"),
Collections.singletonList(
@@ -240,8 +240,8 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + LogOptions.SCAN.key(),
- LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+ CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Arrays.asList(
"currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index cbbe4b4b..9e3be19f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -41,7 +41,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -79,9 +79,7 @@ public class CreateTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
.isPresent();
// check table store
- assertThat(
- Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
- .toFile())
+ assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
.exists();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
@@ -131,9 +129,7 @@ public class CreateTableITCase extends TableStoreTestBase {
}
} else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
// failed when creating file store
- Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
- .toFile()
- .mkdirs();
+ Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile().mkdirs();
} else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
// failed when creating log store
createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index 89631e42..e7278976 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -80,9 +80,7 @@ public class DropTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
.isNotPresent();
// check table store
- assertThat(
- Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier))
- .toFile())
+ assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
.doesNotExist();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 43df8244..1eb0596c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -32,11 +32,11 @@ import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.connector.source.FileStoreSource;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.BlockingIterator;
@@ -70,9 +70,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.PATH;
import static org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
@@ -365,7 +365,7 @@ public class FileStoreITCase extends AbstractTestBase {
boolean noFail, TemporaryFolder temporaryFolder, int[] partitions, int[] primaryKey)
throws Exception {
Configuration options = buildConfiguration(noFail, temporaryFolder.newFolder());
- Path tablePath = new FileStoreOptions(options).path();
+ Path tablePath = new CoreOptions(options).path();
UpdateSchema updateSchema =
new UpdateSchema(
TABLE_TYPE,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 4e6e8c59..4e313402 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -51,9 +51,9 @@ import java.time.Duration;
import java.util.List;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.relativeTablePath;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
import static org.junit.jupiter.api.Assertions.fail;
/** ITCase for file store table api. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 1e233b76..33d28349 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,10 +30,9 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
import javax.annotation.Nullable;
@@ -50,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords;
import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertIntoQuery;
@@ -60,7 +60,6 @@ import static org.apache.flink.table.store.connector.ShowCreateUtil.createTableL
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
/** Table store read write test base. */
@@ -72,7 +71,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
protected void checkFileStorePath(
StreamTableEnvironment tEnv, String managedTable, @Nullable String partitionList) {
String relativeFilePath =
- FileStoreOptions.relativeTablePath(
+ CoreOptions.relativeTablePath(
ObjectIdentifier.of(
tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
// check snapshot file path
@@ -232,8 +231,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
throws Exception {
Map<String, String> hints = new HashMap<>();
hints.put(
- LOG_PREFIX + LogOptions.SCAN.key(),
- LogOptions.LogStartupMode.LATEST.name().toLowerCase());
+ LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
+ CoreOptions.LogStartupMode.LATEST.name().toLowerCase());
collectAndCheckUnderSameEnv(
true,
true,
@@ -259,8 +258,9 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
- hints.put(LOG_PREFIX + LogOptions.SCAN.key(), "from-timestamp");
- hints.put(LOG_PREFIX + LogOptions.SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+ hints.put(LOG_PREFIX + CoreOptions.LOG_SCAN.key(), "from-timestamp");
+ hints.put(
+ LOG_PREFIX + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
collectAndCheckUnderSameEnv(
true,
true,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index 996917ca..d3efcf5f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -38,7 +38,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 252f9828..31832f59 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.types.logical.RowType;
@@ -56,22 +55,22 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.path;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.file.FileStoreOptions.path;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -194,13 +193,13 @@ public class TableStoreManagedFactoryTest {
// mix invalid key and leave value to empty to emphasize the deferred validation
Map<String, String> expectedLogOptions =
of(
- LogOptions.SCAN.key(),
+ CoreOptions.LOG_SCAN.key(),
"",
- LogOptions.RETENTION.key(),
+ CoreOptions.LOG_RETENTION.key(),
"",
"dummy.key",
"",
- LogOptions.CHANGELOG_MODE.key(),
+ CoreOptions.LOG_CHANGELOG_MODE.key(),
"");
Map<String, String> enrichedOptions =
addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
@@ -273,7 +272,7 @@ public class TableStoreManagedFactoryTest {
context =
createEnrichedContext(
Collections.singletonMap(
- FileStoreOptions.WRITE_MODE.key(), APPEND_ONLY.toString()));
+ CoreOptions.WRITE_MODE.key(), APPEND_ONLY.toString()));
assertThatThrownBy(
() ->
tableStoreManagedFactory.onCompactTable(
@@ -325,8 +324,8 @@ public class TableStoreManagedFactoryTest {
sharedTempDir.toString(),
LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
"localhost:9092",
- LOG_PREFIX + CONSISTENCY.key(),
- CONSISTENCY.defaultValue().name());
+ LOG_PREFIX + LOG_CONSISTENCY.key(),
+ LOG_CONSISTENCY.defaultValue().name());
// set configuration under session level
Arguments arg1 =
@@ -342,7 +341,7 @@ public class TableStoreManagedFactoryTest {
// set both session and table level configuration to test options combination
Map<String, String> tableOptions = new HashMap<>(enrichedOptions);
tableOptions.remove(ROOT_PATH.key());
- tableOptions.remove(CONSISTENCY.key());
+ tableOptions.remove(LOG_CONSISTENCY.key());
Arguments arg3 =
Arguments.of(
addPrefix(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 558f6d25..1f321af9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,7 +31,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -50,11 +50,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
/** End-to-end test base for table store. */
public abstract class TableStoreTestBase extends KafkaTableTestBase {
@@ -154,7 +154,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
protected void deleteTablePath() {
FileUtils.deleteQuietly(
- Paths.get(rootPath, FileStoreOptions.relativeTablePath(tableIdentifier)).toFile());
+ Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile());
}
/** Expected result wrapper. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 201f1692..59465a73 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
@@ -28,7 +29,6 @@ import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.types.Row;
@@ -102,10 +102,10 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
KafkaLogTestUtils.testContext(
name,
getBootstrapServers(),
- LogOptions.LogChangelogMode.AUTO,
+ CoreOptions.LogChangelogMode.AUTO,
transaction
- ? LogOptions.LogConsistency.TRANSACTIONAL
- : LogOptions.LogConsistency.EVENTUAL,
+ ? CoreOptions.LogConsistency.TRANSACTIONAL
+ : CoreOptions.LogConsistency.EVENTUAL,
TABLE_TYPE,
hasPk ? new int[] {2} : new int[0]);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index a99d05c0..4a4be635 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -24,12 +24,11 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
@@ -81,7 +80,7 @@ public class TestChangelogDataReadWrite {
tablePath,
RowType.of(new IntType()),
"default",
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
this.snapshotManager = new SnapshotManager(new Path(root));
this.service = service;
}
@@ -140,7 +139,7 @@ public class TestChangelogDataReadWrite {
}
public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
- MergeTreeOptions options = new MergeTreeOptions(new Configuration());
+ CoreOptions options = new CoreOptions(new Configuration());
RecordWriter<KeyValue> writer =
new KeyValueFileStoreWrite(
new SchemaManager(tablePath),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
new file mode 100644
index 00000000..1df27218
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
+
+/** Core options for table store. */
+public class CoreOptions implements Serializable {
+ public static final String LOG_PREFIX = "log.";
+ public static final String TABLE_STORE_PREFIX = "table-store.";
+
+ public static final ConfigOption<Integer> BUCKET =
+ ConfigOptions.key("bucket")
+ .intType()
+ .defaultValue(1)
+ .withDescription("Bucket number for file store.");
+
+ public static final ConfigOption<String> PATH =
+ ConfigOptions.key("path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The file path of this table in the filesystem.");
+
+ public static final ConfigOption<String> FILE_FORMAT =
+ ConfigOptions.key("file.format")
+ .stringType()
+ .defaultValue("orc")
+ .withDescription("Specify the message format of data files.");
+
+ public static final ConfigOption<String> MANIFEST_FORMAT =
+ ConfigOptions.key("manifest.format")
+ .stringType()
+ .defaultValue("avro")
+ .withDescription("Specify the message format of manifest files.");
+
+ public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
+ ConfigOptions.key("manifest.target-file-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(8))
+ .withDescription("Suggested file size of a manifest file.");
+
+ public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
+ ConfigOptions.key("manifest.merge-min-count")
+ .intType()
+ .defaultValue(30)
+ .withDescription(
+ "To avoid frequent manifest merges, this parameter specifies the minimum number "
+ + "of ManifestFileMeta to merge.");
+
+ public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
+ key("partition.default-name")
+ .stringType()
+ .defaultValue("__DEFAULT_PARTITION__")
+ .withDescription(
+ "The default partition name in case the dynamic partition"
+ + " column value is null/empty string.");
+
+ public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
+ ConfigOptions.key("snapshot.num-retained.min")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The minimum number of completed snapshots to retain.");
+
+ public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX =
+ ConfigOptions.key("snapshot.num-retained.max")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription("The maximum number of completed snapshots to retain.");
+
+ public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
+ ConfigOptions.key("snapshot.time-retained")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription("The maximum time of completed snapshots to retain.");
+
+ public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
+ ConfigOptions.key("continuous.discovery-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("The discovery interval of continuous reading.");
+
+ public static final ConfigOption<CoreOptions.MergeEngine> MERGE_ENGINE =
+ ConfigOptions.key("merge-engine")
+ .enumType(CoreOptions.MergeEngine.class)
+ .defaultValue(CoreOptions.MergeEngine.DEDUPLICATE)
+ .withDescription(
+ Description.builder()
+ .text("Specify the merge engine for table with primary key.")
+ .linebreak()
+ .list(
+ formatEnumOption(CoreOptions.MergeEngine.DEDUPLICATE),
+ formatEnumOption(
+ CoreOptions.MergeEngine.PARTIAL_UPDATE))
+ .build());
+
+ public static final ConfigOption<WriteMode> WRITE_MODE =
+ ConfigOptions.key("write-mode")
+ .enumType(WriteMode.class)
+ .defaultValue(WriteMode.CHANGE_LOG)
+ .withDescription(
+ Description.builder()
+ .text("Specify the write mode for table.")
+ .linebreak()
+ .list(formatEnumOption(WriteMode.APPEND_ONLY))
+ .list(formatEnumOption(WriteMode.CHANGE_LOG))
+ .build());
+
+ public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
+ ConfigOptions.key("source.split.target-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription("Target size of a source split when scanning a bucket.");
+
+ public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
+ ConfigOptions.key("source.split.open-file-cost")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(4))
+ .withDescription(
+ "Open file cost of a source file. It is used to avoid reading"
+ + " too many files with a source split, which can be very slow.");
+
+ public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
+ ConfigOptions.key("write-buffer-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("256 mb"))
+ .withDescription(
+ "Amount of data to build up in memory before converting to a sorted on-disk file.");
+
+ public static final ConfigOption<MemorySize> PAGE_SIZE =
+ ConfigOptions.key("page-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("64 kb"))
+ .withDescription("Memory page size.");
+
+ public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
+ ConfigOptions.key("target-file-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription("Target size of a file.");
+
+ public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER =
+ ConfigOptions.key("num-sorted-run.compaction-trigger")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and "
+ + "high-level runs (one level one sorted run).");
+
+ public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER =
+ ConfigOptions.key("num-sorted-run.stop-trigger")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The number of sorted-runs that trigger the stopping of writes.");
+
+ public static final ConfigOption<Integer> NUM_LEVELS =
+ ConfigOptions.key("num-levels")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Total level number, for example, there are 3 levels, including 0,1,2 levels.");
+
+ public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
+ ConfigOptions.key("commit.force-compact")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to force a compaction before commit.");
+
+ public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
+ ConfigOptions.key("compaction.max-size-amplification-percent")
+ .intType()
+ .defaultValue(200)
+ .withDescription(
+ "The size amplification is defined as the amount (in percentage) of additional storage "
+ + "needed to store a single byte of data in the merge tree.");
+
+ public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
+ ConfigOptions.key("compaction.size-ratio")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) "
+ + "size is 1% smaller than the next sorted run's size, then include next sorted run "
+ + "into this candidate set.");
+
+ public static final ConfigOption<Boolean> CHANGELOG_FILE =
+ ConfigOptions.key("changelog-file")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to double write to a changelog file when flushing memory table. "
+ + "This changelog file keeps the order of data input and the details of data changes, "
+ + "it can be read directly during stream reads.");
+
+ public static final ConfigOption<LogStartupMode> LOG_SCAN =
+ ConfigOptions.key("scan")
+ .enumType(LogStartupMode.class)
+ .defaultValue(LogStartupMode.FULL)
+ .withDescription(
+ Description.builder()
+ .text("Specify the startup mode for log consumer.")
+ .linebreak()
+ .list(formatEnumOption(LogStartupMode.FULL))
+ .list(formatEnumOption(LogStartupMode.LATEST))
+ .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
+ .build());
+
+ public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
+ ConfigOptions.key("scan.timestamp-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of \"from-timestamp\" scan mode");
+
+ public static final ConfigOption<Duration> LOG_RETENTION =
+ ConfigOptions.key("retention")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "It means how long changes log will be kept. The default value is from the log system cluster.");
+
+ public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
+ ConfigOptions.key("consistency")
+ .enumType(LogConsistency.class)
+ .defaultValue(LogConsistency.TRANSACTIONAL)
+ .withDescription(
+ Description.builder()
+ .text("Specify the log consistency mode for table.")
+ .linebreak()
+ .list(
+ formatEnumOption(LogConsistency.TRANSACTIONAL),
+ formatEnumOption(LogConsistency.EVENTUAL))
+ .build());
+
+ public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
+ ConfigOptions.key("changelog-mode")
+ .enumType(LogChangelogMode.class)
+ .defaultValue(LogChangelogMode.AUTO)
+ .withDescription(
+ Description.builder()
+ .text("Specify the log changelog mode for table.")
+ .linebreak()
+ .list(
+ formatEnumOption(LogChangelogMode.AUTO),
+ formatEnumOption(LogChangelogMode.ALL),
+ formatEnumOption(LogChangelogMode.UPSERT))
+ .build());
+
+ public static final ConfigOption<String> LOG_KEY_FORMAT =
+ ConfigOptions.key("key.format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription(
+ "Specify the key message format of log system with primary key.");
+
+ public static final ConfigOption<String> LOG_FORMAT =
+ ConfigOptions.key("format")
+ .stringType()
+ .defaultValue("debezium-json")
+ .withDescription("Specify the message format of log system.");
+
+ public long writeBufferSize;
+
+ public int pageSize;
+
+ public long targetFileSize;
+
+ public int numSortedRunCompactionTrigger;
+
+ public int numSortedRunStopTrigger;
+
+ public int numLevels;
+
+ public boolean commitForceCompact;
+
+ public int maxSizeAmplificationPercent;
+
+ public int sizeRatio;
+
+ public boolean enableChangelogFile;
+
+ private Configuration options;
+
+ public CoreOptions(
+ long writeBufferSize,
+ int pageSize,
+ long targetFileSize,
+ int numSortedRunCompactionTrigger,
+ int numSortedRunStopTrigger,
+ Integer numLevels,
+ boolean commitForceCompact,
+ int maxSizeAmplificationPercent,
+ int sizeRatio,
+ boolean enableChangelogFile) {
+ this.writeBufferSize = writeBufferSize;
+ this.pageSize = pageSize;
+ this.targetFileSize = targetFileSize;
+ this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
+ this.numSortedRunStopTrigger =
+ Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
+ // By default, this ensures that the compaction does not fall to level 0, but at least to
+ // level 1
+ this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
+ this.commitForceCompact = commitForceCompact;
+ this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
+ this.sizeRatio = sizeRatio;
+ this.enableChangelogFile = enableChangelogFile;
+ }
+
+ public CoreOptions(Configuration config) {
+ this(
+ config.get(WRITE_BUFFER_SIZE).getBytes(),
+ (int) config.get(PAGE_SIZE).getBytes(),
+ config.get(TARGET_FILE_SIZE).getBytes(),
+ config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
+ config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
+ config.get(NUM_LEVELS),
+ config.get(COMMIT_FORCE_COMPACT),
+ config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
+ config.get(COMPACTION_SIZE_RATIO),
+ config.get(CHANGELOG_FILE));
+ this.options = config;
+ Preconditions.checkArgument(
+ snapshotNumRetainMin() > 0,
+ SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
+ Preconditions.checkArgument(
+ snapshotNumRetainMin() <= snapshotNumRetainMax(),
+ SNAPSHOT_NUM_RETAINED_MIN.key()
+ + " should not be larger than "
+ + SNAPSHOT_NUM_RETAINED_MAX.key());
+ }
+
+ public CoreOptions(Map<String, String> options) {
+ this(Configuration.fromMap(options));
+ }
+
+ public static Set<ConfigOption<?>> allOptions() {
+ Set<ConfigOption<?>> allOptions = new HashSet<>();
+ allOptions.add(BUCKET);
+ allOptions.add(PATH);
+ allOptions.add(FILE_FORMAT);
+ allOptions.add(MANIFEST_FORMAT);
+ allOptions.add(MANIFEST_TARGET_FILE_SIZE);
+ allOptions.add(MANIFEST_MERGE_MIN_COUNT);
+ allOptions.add(PARTITION_DEFAULT_NAME);
+ allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
+ allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
+ allOptions.add(SNAPSHOT_TIME_RETAINED);
+ allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
+ allOptions.add(MERGE_ENGINE);
+ allOptions.add(WRITE_MODE);
+ allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
+ allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
+ allOptions.add(WRITE_BUFFER_SIZE);
+ allOptions.add(PAGE_SIZE);
+ allOptions.add(TARGET_FILE_SIZE);
+ allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
+ allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
+ allOptions.add(NUM_LEVELS);
+ allOptions.add(COMMIT_FORCE_COMPACT);
+ allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+ allOptions.add(COMPACTION_SIZE_RATIO);
+ return allOptions;
+ }
+
+ public int bucket() {
+ return options.get(BUCKET);
+ }
+
+ public Path path() {
+ return path(options.toMap());
+ }
+
+ public static Path path(Map<String, String> options) {
+ return new Path(options.get(PATH.key()));
+ }
+
+ public static Path path(Configuration options) {
+ return new Path(options.get(PATH));
+ }
+
+ public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+ return String.format(
+ "%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
+ }
+
+ public FileFormat fileFormat() {
+ return FileFormat.fromTableOptions(options, FILE_FORMAT);
+ }
+
+ public FileFormat manifestFormat() {
+ return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
+ }
+
+ public MemorySize manifestTargetSize() {
+ return options.get(MANIFEST_TARGET_FILE_SIZE);
+ }
+
+ public String partitionDefaultName() {
+ return options.get(PARTITION_DEFAULT_NAME);
+ }
+
+ public int snapshotNumRetainMin() {
+ return options.get(SNAPSHOT_NUM_RETAINED_MIN);
+ }
+
+ public int snapshotNumRetainMax() {
+ return options.get(SNAPSHOT_NUM_RETAINED_MAX);
+ }
+
+ public Duration snapshotTimeRetain() {
+ return options.get(SNAPSHOT_TIME_RETAINED);
+ }
+
+ public int manifestMergeMinCount() {
+ return options.get(MANIFEST_MERGE_MIN_COUNT);
+ }
+
+ public long splitTargetSize() {
+ return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
+ }
+
+ public long splitOpenFileCost() {
+ return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
+ }
+
+ /** Specifies the merge engine for table with primary key. */
+ public enum MergeEngine implements DescribedEnum {
+ DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+
+ PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+
+ private final String value;
+ private final String description;
+
+ MergeEngine(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** Specifies the startup mode for log consumer. */
+ public enum LogStartupMode implements DescribedEnum {
+ FULL(
+ "full",
+ "Perform a snapshot on the table upon first startup,"
+ + " and continue to read the latest changes."),
+
+ LATEST("latest", "Start from the latest."),
+
+ FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+
+ private final String value;
+ private final String description;
+
+ LogStartupMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** Specifies the log consistency mode for table. */
+ public enum LogConsistency implements DescribedEnum {
+ TRANSACTIONAL(
+ "transactional",
+ "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
+
+ EVENTUAL(
+ "eventual",
+ "Immediate data visibility, you may see some intermediate states, "
+ + "but eventually the right results will be produced, only works for table with primary key.");
+
+ private final String value;
+ private final String description;
+
+ LogConsistency(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** Specifies the log changelog mode for table. */
+ public enum LogChangelogMode implements DescribedEnum {
+ AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
+
+ ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
+
+ UPSERT(
+ "upsert",
+ "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
+ + " will automatically add the normalized node, relying on the state"
+ + " to generate the required update_before.");
+
+ private final String value;
+ private final String description;
+
+ LogChangelogMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
index 4e3338f5..ff943a19 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
@@ -37,14 +38,14 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
protected final SchemaManager schemaManager;
protected final long schemaId;
- protected final FileStoreOptions options;
+ protected final CoreOptions options;
protected final String user;
protected final RowType partitionType;
public AbstractFileStore(
SchemaManager schemaManager,
long schemaId,
- FileStoreOptions options,
+ CoreOptions options,
String user,
RowType partitionType) {
this.schemaManager = schemaManager;
@@ -88,7 +89,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
return partitionType;
}
- public FileStoreOptions options() {
+ public CoreOptions options() {
return options;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index c9a8dec2..6236a39f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
@@ -33,7 +34,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
public AppendOnlyFileStore(
SchemaManager schemaManager,
long schemaId,
- FileStoreOptions options,
+ CoreOptions options,
String user,
RowType partitionType,
RowType rowType) {
@@ -61,7 +62,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
pathFactory(),
snapshotManager(),
newScan(true),
- options.mergeTreeOptions().targetFileSize);
+ options.targetFileSize);
}
private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 6d2dc3a8..fb751a73 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
@@ -41,7 +42,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
public KeyValueFileStore(
SchemaManager schemaManager,
long schemaId,
- FileStoreOptions options,
+ CoreOptions options,
String user,
RowType partitionType,
RowType keyType,
@@ -85,7 +86,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
pathFactory(),
snapshotManager(),
newScan(true),
- options.mergeTreeOptions());
+ options);
}
private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 3c628af0..b06df389 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -20,12 +20,12 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.Levels;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -60,7 +60,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
private final DataFileWriter.Factory dataFileWriterFactory;
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
private final MergeFunction mergeFunction;
- private final MergeTreeOptions options;
+ private final CoreOptions options;
public KeyValueFileStoreWrite(
SchemaManager schemaManager,
@@ -73,7 +73,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
- MergeTreeOptions options) {
+ CoreOptions options) {
super(snapshotManager, scan);
this.dataFileReaderFactory =
new DataFileReader.Factory(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index e97b467d..7fc48bc9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.PartitionPathUtils;
@@ -60,7 +60,7 @@ public class FileStorePathFactory {
root,
RowType.of(),
PARTITION_DEFAULT_NAME.defaultValue(),
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
}
// for tables without partition, partitionType should be a row type with 0 columns (not null)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 7032ea06..4ae4db06 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
@@ -83,8 +84,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
TableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(
- DeserializationFormatFactory.class, LogOptions.KEY_FORMAT);
- validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+ DeserializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
return format;
}
@@ -92,8 +93,8 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
TableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(
- SerializationFormatFactory.class, LogOptions.KEY_FORMAT);
- validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+ SerializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
return format;
}
@@ -101,16 +102,17 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
TableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(
- DeserializationFormatFactory.class, LogOptions.FORMAT);
- validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+ DeserializationFormatFactory.class, CoreOptions.LOG_FORMAT);
+ validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
return format;
}
static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
TableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
- helper.discoverEncodingFormat(SerializationFormatFactory.class, LogOptions.FORMAT);
- validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+ helper.discoverEncodingFormat(
+ SerializationFormatFactory.class, CoreOptions.LOG_FORMAT);
+ validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
return format;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 4fdede94..103dc3a9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.table;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.AppendOnlyFileStore;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
@@ -59,7 +59,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
new AppendOnlyFileStore(
schemaManager,
tableSchema.id(),
- new FileStoreOptions(tableSchema.options()),
+ new CoreOptions(tableSchema.options()),
user,
tableSchema.logicalPartitionType(),
tableSchema.logicalRowType());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index d622f6ec..2830700f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.WriteMode;
@@ -67,7 +67,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
new KeyValueFileStore(
schemaManager,
tableSchema.id(),
- new FileStoreOptions(tableSchema.options()),
+ new CoreOptions(tableSchema.options()),
user,
tableSchema.logicalPartitionType(),
tableSchema.logicalRowType(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 1c32917f..585caffd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.WriteMode;
@@ -80,7 +80,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
Configuration conf = Configuration.fromMap(tableSchema.options());
- FileStoreOptions.MergeEngine mergeEngine = conf.get(FileStoreOptions.MERGE_ENGINE);
+ CoreOptions.MergeEngine mergeEngine = conf.get(CoreOptions.MERGE_ENGINE);
MergeFunction mergeFunction;
switch (mergeEngine) {
case DEDUPLICATE:
@@ -102,7 +102,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
new KeyValueFileStore(
schemaManager,
tableSchema.id(),
- new FileStoreOptions(conf),
+ new CoreOptions(conf),
user,
tableSchema.logicalPartitionType(),
keyType,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 82a1a039..440222f8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -20,14 +20,14 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import java.util.UUID;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.PATH;
/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
@@ -43,7 +43,7 @@ public class FileStoreTableFactory {
}
public static FileStoreTable create(Configuration conf, String user) {
- Path tablePath = FileStoreOptions.path(conf);
+ Path tablePath = CoreOptions.path(conf);
TableSchema tableSchema =
new SchemaManager(tablePath)
.latest()
@@ -69,7 +69,7 @@ public class FileStoreTableFactory {
tableSchema = tableSchema.copy(newOptions.toMap());
SchemaManager schemaManager = new SchemaManager(tablePath);
- if (newOptions.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
+ if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
return new AppendOnlyFileStoreTable(tablePath, schemaManager, tableSchema, user);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
index 215840a1..1e997477 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -18,11 +18,10 @@
package org.apache.flink.table.store.table.sink;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -39,15 +38,11 @@ public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
private final MemoryPoolFactory memoryPoolFactory;
protected MemoryTableWrite(
- FileStoreWrite<T> write,
- SinkRecordConverter recordConverter,
- FileStoreOptions options) {
+ FileStoreWrite<T> write, SinkRecordConverter recordConverter, CoreOptions options) {
super(write, recordConverter);
- MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
HeapMemorySegmentPool memoryPool =
- new HeapMemorySegmentPool(
- mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+ new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize);
this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index e46fd09f..e096cf00 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -97,8 +98,8 @@ public class FileFormatTest {
public FileFormat createFileFormat(String codec) {
Configuration tableOptions = new Configuration();
- tableOptions.set(FileStoreOptions.FILE_FORMAT, "avro");
+ tableOptions.set(CoreOptions.FILE_FORMAT, "avro");
tableOptions.setString("avro.codec", codec);
- return FileFormat.fromTableOptions(tableOptions, FileStoreOptions.FILE_FORMAT);
+ return FileFormat.fromTableOptions(tableOptions, CoreOptions.FILE_FORMAT);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index fb290140..a7ff2a86 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -31,7 +32,6 @@ import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
@@ -93,26 +93,26 @@ public class TestFileStore extends KeyValueFileStore {
MergeFunction mergeFunction) {
Configuration conf = new Configuration();
- conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
- conf.set(MergeTreeOptions.PAGE_SIZE, PAGE_SIZE);
- conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
+ conf.set(CoreOptions.PAGE_SIZE, PAGE_SIZE);
+ conf.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
conf.set(
- FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+ CoreOptions.MANIFEST_TARGET_FILE_SIZE,
MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
- conf.set(FileStoreOptions.FILE_FORMAT, format);
- conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
- conf.set(FileStoreOptions.PATH, root);
- conf.set(FileStoreOptions.BUCKET, numBuckets);
+ conf.set(CoreOptions.FILE_FORMAT, format);
+ conf.set(CoreOptions.MANIFEST_FORMAT, format);
+ conf.set(CoreOptions.PATH, root);
+ conf.set(CoreOptions.BUCKET, numBuckets);
return new TestFileStore(
- root, new FileStoreOptions(conf), partitionType, keyType, valueType, mergeFunction);
+ root, new CoreOptions(conf), partitionType, keyType, valueType, mergeFunction);
}
private TestFileStore(
String root,
- FileStoreOptions options,
+ CoreOptions options,
RowType partitionType,
RowType keyType,
RowType valueType,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 5cfb2f73..3fdeb330 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -215,7 +215,7 @@ public class AppendOnlyWriterTest {
new Path(tempDir.toString()),
"dt=" + PART,
1,
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
}
private RecordWriter<RowData> createWriter(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
index cc4448b0..ad129ede 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFilePathFactoryTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.file.data;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -38,7 +38,7 @@ public class DataFilePathFactoryTest {
new Path(tempDir.toString()),
"",
123,
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
String uuid = pathFactory.uuid();
for (int i = 0; i < 20; i++) {
@@ -51,7 +51,7 @@ public class DataFilePathFactoryTest {
+ "-"
+ i
+ "."
- + FileStoreOptions.FILE_FORMAT.defaultValue()));
+ + CoreOptions.FILE_FORMAT.defaultValue()));
}
assertThat(pathFactory.toPath("my-data-file-name"))
.isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-data-file-name"));
@@ -64,7 +64,7 @@ public class DataFilePathFactoryTest {
new Path(tempDir.toString()),
"dt=20211224",
123,
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
String uuid = pathFactory.uuid();
for (int i = 0; i < 20; i++) {
@@ -77,7 +77,7 @@ public class DataFilePathFactoryTest {
+ "-"
+ i
+ "."
- + FileStoreOptions.FILE_FORMAT.defaultValue()));
+ + CoreOptions.FILE_FORMAT.defaultValue()));
}
assertThat(pathFactory.toPath("my-data-file-name"))
.isEqualTo(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 43811b7a..8bb8194f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
@@ -146,7 +146,7 @@ public class ManifestFileMetaTest {
new Path(path),
PARTITION_TYPE,
"default",
- FileStoreOptions.FILE_FORMAT.defaultValue()),
+ CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE)
.create();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 3ad75724..98e83fdb 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -97,7 +97,7 @@ public class ManifestFileTest {
new Path(path),
DEFAULT_PART_TYPE,
"default",
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
return new ManifestFile.Factory(
new SchemaManager(new Path(path)),
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 6064c1cf..95bfca51 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -94,7 +94,7 @@ public class ManifestListTest {
new Path(path),
TestKeyValueGenerator.DEFAULT_PART_TYPE,
"default",
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
return new ManifestList.Factory(TestKeyValueGenerator.DEFAULT_PART_TYPE, avro, pathFactory)
.create();
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 18ad6073..2de27d14 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
@@ -81,7 +82,7 @@ public class MergeTreeTest {
private FileStorePathFactory pathFactory;
private Comparator<RowData> comparator;
- private MergeTreeOptions options;
+ private CoreOptions options;
private DataFileReader dataFileReader;
private DataFileWriter dataFileWriter;
private RecordWriter<KeyValue> writer;
@@ -98,10 +99,10 @@ public class MergeTreeTest {
private void recreateMergeTree(long targetFileSize) {
Configuration configuration = new Configuration();
- configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
- configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
- configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
- options = new MergeTreeOptions(configuration);
+ configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
+ options = new CoreOptions(configuration);
RowType keyType = new RowType(singletonList(new RowType.RowField("k", new IntType())));
RowType valueType = new RowType(singletonList(new RowType.RowField("v", new IntType())));
FileFormat flushingAvro = new FlushingFileFormat("avro");
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
index 154ec613..c2ca6a68 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -84,7 +84,7 @@ public class FileStorePathFactoryTest {
new LogicalType[] {new VarCharType(10), new IntType()},
new String[] {"dt", "hr"}),
"default",
- FileStoreOptions.FILE_FORMAT.defaultValue());
+ CoreOptions.FILE_FORMAT.defaultValue());
assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 85579d09..67778c2d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -163,9 +163,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
protected FileStoreTable createFileStoreTable() throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
- conf.set(FileStoreOptions.PATH, tablePath.toString());
- conf.set(FileStoreOptions.FILE_FORMAT, "avro");
- conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.FILE_FORMAT, "avro");
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index fcc1a3f8..7e5422dc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -166,9 +166,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
protected FileStoreTable createFileStoreTable() throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
- conf.set(FileStoreOptions.PATH, tablePath.toString());
- conf.set(FileStoreOptions.FILE_FORMAT, "avro");
- conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.FILE_FORMAT, "avro");
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 6d12a2cd..63d238ff 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,9 +21,8 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -194,10 +193,10 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
- conf.set(FileStoreOptions.PATH, tablePath.toString());
- conf.set(FileStoreOptions.FILE_FORMAT, "avro");
- conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
- conf.set(MergeTreeOptions.CHANGELOG_FILE, changelogFile);
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.FILE_FORMAT, "avro");
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ conf.set(CoreOptions.CHANGELOG_FILE, changelogFile);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 4c6d02fe..b9f77bbd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -22,9 +22,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -89,11 +88,11 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
protected FileStoreTable createFileStoreTable() throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
- conf.set(FileStoreOptions.PATH, tablePath.toString());
- conf.set(FileStoreOptions.FILE_FORMAT, "avro");
- conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
- conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
- conf.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(1024));
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.FILE_FORMAT, "avro");
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema schema =
schemaManager.commitNewVersion(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 98f502ac..1e63b051 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -19,10 +19,10 @@
package org.apache.flink.table.store.mapred;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
import org.apache.flink.table.store.TableStoreJobConf;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -71,7 +71,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
private FileStoreTable createFileStoreTable(JobConf jobConf) {
TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
Configuration conf = new Configuration();
- conf.set(FileStoreOptions.PATH, wrapper.getLocation());
+ conf.set(CoreOptions.PATH, wrapper.getLocation());
return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
index 10721be9..05ef9fbf 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -29,7 +28,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.List;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static org.apache.flink.table.store.CoreOptions.PATH;
/** Test utils related to {@link org.apache.flink.table.store.file.FileStore}. */
public class FileStoreTestUtils {
@@ -40,7 +39,7 @@ public class FileStoreTestUtils {
List<String> partitionKeys,
List<String> primaryKeys)
throws Exception {
- Path tablePath = FileStoreOptions.path(conf);
+ Path tablePath = CoreOptions.path(conf);
new SchemaManager(tablePath)
.commitNewVersion(
new UpdateSchema(rowType, partitionKeys, primaryKeys, conf.toMap(), ""));
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 00e19636..05cff356 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -24,8 +24,8 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.FileStoreTestUtils;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableWrite;
@@ -84,9 +84,9 @@ public class TableStoreHiveStorageHandlerITCase {
public void testReadExternalTableWithPk() throws Exception {
String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, path);
- conf.setInteger(FileStoreOptions.BUCKET, 2);
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, path);
+ conf.setInteger(CoreOptions.BUCKET, 2);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -126,9 +126,9 @@ public class TableStoreHiveStorageHandlerITCase {
public void testReadExternalTableWithoutPk() throws Exception {
String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, path);
- conf.setInteger(FileStoreOptions.BUCKET, 2);
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, path);
+ conf.setInteger(CoreOptions.BUCKET, 2);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -169,8 +169,8 @@ public class TableStoreHiveStorageHandlerITCase {
public void testReadAllSupportedTypes() throws Exception {
String root = folder.newFolder().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, root);
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, root);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -284,8 +284,8 @@ public class TableStoreHiveStorageHandlerITCase {
public void testPredicatePushDown() throws Exception {
String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, path);
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, path);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -374,8 +374,8 @@ public class TableStoreHiveStorageHandlerITCase {
public void testDateAndTimestamp() throws Exception {
String path = folder.newFolder().toURI().toString();
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, path);
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, path);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 6befdfff..610dea4f 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.RowDataContainer;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -57,8 +57,8 @@ public class TableStoreRecordReaderTest {
@Test
public void testPk() throws Exception {
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, tempDir.toString());
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, tempDir.toString());
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -98,8 +98,8 @@ public class TableStoreRecordReaderTest {
@Test
public void testValueCount() throws Exception {
Configuration conf = new Configuration();
- conf.setString(FileStoreOptions.PATH, tempDir.toString());
- conf.setString(FileStoreOptions.FILE_FORMAT, "avro");
+ conf.setString(CoreOptions.PATH, tempDir.toString());
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index bf0aa4c5..22d9d3f5 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 7365e44d..7698bcfe 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index 861f77bb..59c0d4ab 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -24,8 +24,8 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.types.DataType;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index b247f575..85870e9f 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -55,16 +55,16 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.CoreOptions.LogConsistency;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.FORMAT;
-import static org.apache.flink.table.store.log.LogOptions.KEY_FORMAT;
-import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
-import static org.apache.flink.table.store.log.LogOptions.RETENTION;
-import static org.apache.flink.table.store.log.LogOptions.SCAN;
-import static org.apache.flink.table.store.log.LogOptions.SCAN_TIMESTAMP_MILLS;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
/** The Kafka {@link LogStoreTableFactory} implementation. */
@@ -89,14 +89,14 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
- options.add(SCAN);
+ options.add(LOG_SCAN);
options.add(TOPIC);
- options.add(SCAN_TIMESTAMP_MILLS);
- options.add(RETENTION);
- options.add(CONSISTENCY);
- options.add(CHANGELOG_MODE);
- options.add(KEY_FORMAT);
- options.add(FORMAT);
+ options.add(LOG_SCAN_TIMESTAMP_MILLS);
+ options.add(LOG_RETENTION);
+ options.add(LOG_CONSISTENCY);
+ options.add(LOG_CHANGELOG_MODE);
+ options.add(LOG_KEY_FORMAT);
+ options.add(LOG_FORMAT);
return options;
}
@@ -124,7 +124,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
Map<String, String> configs = new HashMap<>();
helper.getOptions()
- .getOptional(RETENTION)
+ .getOptional(LOG_RETENTION)
.ifPresent(
retention ->
configs.put(
@@ -210,9 +210,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
primaryKeyDeserializer,
valueDeserializer,
projectFields,
- helper.getOptions().get(CONSISTENCY),
- helper.getOptions().get(SCAN),
- helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
+ helper.getOptions().get(LOG_CONSISTENCY),
+ helper.getOptions().get(LOG_SCAN),
+ helper.getOptions().get(LOG_SCAN_TIMESTAMP_MILLS));
}
@Override
@@ -237,8 +237,8 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
toKafkaProperties(helper.getOptions()),
primaryKeySerializer,
valueSerializer,
- helper.getOptions().get(CONSISTENCY),
- helper.getOptions().get(CHANGELOG_MODE));
+ helper.getOptions().get(LOG_CONSISTENCY),
+ helper.getOptions().get(LOG_CHANGELOG_MODE));
}
private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
@@ -261,7 +261,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
optionMap.get(key)));
// Add read committed for transactional consistency mode.
- if (options.get(CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
+ if (options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
}
return properties;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index 30c66cfa..e6f39c90 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -23,9 +23,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.file.utils.BlockingIterator;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import org.apache.flink.table.store.log.LogOptions.LogConsistency;
import org.apache.flink.types.RowKind;
import org.junit.Assert;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 35ff9960..7f4f5df7 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.kafka;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index ac225429..343525e6 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -54,13 +54,13 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import static org.apache.flink.table.store.CoreOptions.LogConsistency;
import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
-import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
-import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
-import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
-import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
/** Utils for the test of {@link KafkaLogStoreFactory}. */
public class KafkaLogTestUtils {
@@ -188,8 +188,8 @@ public class KafkaLogTestUtils {
RowType type,
int[] keys) {
Map<String, String> options = new HashMap<>();
- options.put(CHANGELOG_MODE.key(), changelogMode.toString());
- options.put(CONSISTENCY.key(), consistency.toString());
+ options.put(LOG_CHANGELOG_MODE.key(), changelogMode.toString());
+ options.put(LOG_CONSISTENCY.key(), consistency.toString());
options.put(BOOTSTRAP_SERVERS.key(), servers);
options.put(TOPIC.key(), UUID.randomUUID().toString());
return createContext(name, type, keys, options);
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 7231752f..14656cb5 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -44,7 +44,7 @@ public class SimpleTableTestHelper {
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
Map<String, String> options = new HashMap<>();
// orc is shaded, can not find shaded classes in ide
- options.put(FileStoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(CoreOptions.FILE_FORMAT.key(), "avro");
new SchemaManager(path)
.commitNewVersion(
new UpdateSchema(