You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/05 06:43:41 UTC
[flink-table-store] branch master updated: [FLINK-28387] Organizing Options in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c233679e [FLINK-28387] Organizing Options in table store
c233679e is described below
commit c233679e5e0a3dea7d98c4d3487466d1139da6b8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 5 14:43:37 2022 +0800
[FLINK-28387] Organizing Options in table store
This closes #197
---
docs/content/docs/development/create-table.md | 4 +-
.../store/connector/TableStoreManagedFactory.java | 30 +--
.../store/connector/AbstractTableStoreFactory.java | 51 ++--
...toryOptions.java => FlinkConnectorOptions.java} | 41 ++-
.../store/connector/sink/FlinkSinkBuilder.java | 6 +-
.../table/store/connector/sink/TableStoreSink.java | 30 +--
.../store/connector/source/FlinkSourceBuilder.java | 25 +-
.../store/connector/source/TableStoreSource.java | 33 ++-
.../ComputedColumnAndWatermarkTableITCase.java | 16 +-
.../table/store/connector/CreateTableITCase.java | 7 +-
.../table/store/connector/DropTableITCase.java | 4 +-
.../store/connector/FileStoreTableITCase.java | 6 +-
.../store/connector/FileSystemCatalogITCase.java | 8 +-
.../store/connector/ReadWriteTableITCase.java | 6 +-
.../store/connector/ReadWriteTableTestBase.java | 23 +-
.../store/connector/StreamingWarehouseITCase.java | 2 +-
.../connector/TableStoreManagedFactoryTest.java | 44 +---
.../table/store/connector/TableStoreTestBase.java | 14 +-
.../store/connector/sink/LogStoreSinkITCase.java | 9 +-
.../source/TestChangelogDataReadWrite.java | 2 +-
.../apache/flink/table/store/CatalogOptions.java | 25 +-
.../org/apache/flink/table/store/CoreOptions.java | 200 ++++++--------
.../table/store/file/AppendOnlyFileStore.java | 2 +-
.../flink/table/store/file/FileStoreOptions.java | 289 ---------------------
.../flink/table/store/file/KeyValueFileStore.java | 2 +
.../table/store/file/catalog/CatalogFactory.java | 16 +-
.../store/file/mergetree/MergeTreeOptions.java | 178 -------------
.../file/operation/KeyValueFileStoreWrite.java | 26 +-
.../apache/flink/table/store/log/LogOptions.java | 194 --------------
.../table/store/log/LogStoreTableFactory.java | 24 +-
.../table/store/table/sink/MemoryTableWrite.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 19 +-
.../flink/table/store/tests/LogStoreE2eTest.java | 2 +-
.../flink/table/store/hive/HiveCatalogFactory.java | 10 +-
.../flink/table/store/kafka/KafkaLogOptions.java | 2 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 21 +-
.../flink/table/store/kafka/KafkaLogTestUtils.java | 4 +-
37 files changed, 310 insertions(+), 1067 deletions(-)
diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md
index cba5a9da..cec6c481 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -115,14 +115,14 @@ Important options include the following:
<td>The log system used to keep changes of the table, supports 'kafka'.</td>
</tr>
<tr>
- <td><h5>log.kafka.bootstrap.servers</h5></td>
+ <td><h5>kafka.bootstrap.servers</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Required Kafka server connection string for log store.</td>
</tr>
<tr>
- <td><h5>log.topic</h5></td>
+ <td><h5>kafka.topic</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 431e5c9b..37dfda61 100644
--- a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -18,11 +18,9 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.ManagedTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -31,6 +29,7 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.util.Preconditions;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
@@ -39,13 +38,13 @@ import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.CoreOptions.WRITE_MODE;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
/** Default implementation of {@link ManagedTableFactory}. */
@@ -85,21 +84,12 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
createOptionalLogStoreFactory(context.getClassLoader(), enrichedOptions);
logFactory.ifPresent(
factory ->
- factory.enrichOptions(createLogContext(context, enrichedOptions))
- .forEach((k, v) -> enrichedOptions.putIfAbsent(LOG_PREFIX + k, v)));
+ factory.enrichOptions(new TableStoreDynamicContext(context, enrichedOptions))
+ .forEach(enrichedOptions::putIfAbsent));
return enrichedOptions;
}
- @VisibleForTesting
- static String relativeTablePath(ObjectIdentifier tableIdentifier) {
- return String.format(
- "%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName());
- }
-
@Override
public void onCreateTable(Context context, boolean ignoreIfExists) {
Map<String, String> options = context.getCatalogTable().getOptions();
@@ -151,7 +141,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
.ifPresent(
factory ->
factory.onCreateTable(
- createLogContext(context),
+ context,
Integer.parseInt(
options.getOrDefault(
BUCKET.key(),
@@ -179,7 +169,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory
createOptionalLogStoreFactory(context)
.ifPresent(
factory ->
- factory.onDropTable(createLogContext(context), ignoreIfNotExists));
+ factory.onDropTable(context, ignoreIfNotExists));
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 54de82ff..4f729bf6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -21,13 +21,15 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -39,17 +41,16 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Abstract table store factory to create table source and table sink. */
@@ -63,7 +64,7 @@ public abstract class AbstractTableStoreFactory
buildFileStoreTable(context),
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING,
- createLogContext(context),
+ context,
createOptionalLogStoreFactory(context).orElse(null));
}
@@ -72,7 +73,7 @@ public abstract class AbstractTableStoreFactory
return new TableStoreSink(
context.getObjectIdentifier(),
buildFileStoreTable(context),
- createLogContext(context),
+ context,
createOptionalLogStoreFactory(context).orElse(null));
}
@@ -83,9 +84,9 @@ public abstract class AbstractTableStoreFactory
@Override
public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = CoreOptions.allOptions();
- options.addAll(CoreOptions.allOptions());
- options.addAll(TableStoreFactoryOptions.allOptions());
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.addAll(FlinkConnectorOptions.getOptions());
+ options.addAll(CoreOptions.getOptions());
return options;
}
@@ -112,44 +113,24 @@ public abstract class AbstractTableStoreFactory
}
private static void validateFileStoreContinuous(Configuration options) {
- Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
- CoreOptions.LogChangelogMode changelogMode = logOptions.get(LOG_CHANGELOG_MODE);
- if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
+ LogChangelogMode changelogMode = options.get(LOG_CHANGELOG_MODE);
+ if (changelogMode == LogChangelogMode.UPSERT) {
throw new ValidationException(
"File store continuous reading dose not support upsert changelog mode.");
}
- CoreOptions.LogConsistency consistency = logOptions.get(LOG_CONSISTENCY);
- if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
+ LogConsistency consistency = options.get(LOG_CONSISTENCY);
+ if (consistency == LogConsistency.EVENTUAL) {
throw new ValidationException(
"File store continuous reading dose not support eventual consistency mode.");
}
- CoreOptions.LogStartupMode startupMode = logOptions.get(LOG_SCAN);
- if (startupMode == CoreOptions.LogStartupMode.FROM_TIMESTAMP) {
+ LogStartupMode startupMode = options.get(LOG_SCAN);
+ if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
throw new ValidationException(
"File store continuous reading dose not support from_timestamp scan mode, "
+ "you can add timestamp filters instead.");
}
}
- static DynamicTableFactory.Context createLogContext(DynamicTableFactory.Context context) {
- return createLogContext(context, context.getCatalogTable().getOptions());
- }
-
- static DynamicTableFactory.Context createLogContext(
- DynamicTableFactory.Context context, Map<String, String> options) {
- return new TableStoreDynamicContext(context, filterLogStoreOptions(options));
- }
-
- static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
- return options.entrySet().stream()
- .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // exclude log.system
- .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
- .collect(
- Collectors.toMap(
- entry -> entry.getKey().substring(LOG_PREFIX.length()),
- Map.Entry::getValue));
- }
-
static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) {
FileStoreTable table =
FileStoreTableFactory.create(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
similarity index 71%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 38d6a209..e74a8e5d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -21,14 +21,19 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.FactoryUtil;
-import java.util.HashSet;
-import java.util.Set;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
-/** Options for {@link TableStoreManagedFactory}. */
-public class TableStoreFactoryOptions {
+/** Options for flink connector. */
+public class FlinkConnectorOptions {
+ public static final String TABLE_STORE_PREFIX = "table-store.";
+
+ @Internal
public static final ConfigOption<String> ROOT_PATH =
ConfigOptions.key("root-path")
.stringType()
@@ -68,11 +73,27 @@ public class TableStoreFactoryOptions {
+ "By default, if this option is not defined, the planner will derive the parallelism "
+ "for each statement individually by also considering the global configuration.");
- public static Set<ConfigOption<?>> allOptions() {
- Set<ConfigOption<?>> allOptions = new HashSet<>();
- allOptions.add(LOG_SYSTEM);
- allOptions.add(SINK_PARALLELISM);
- allOptions.add(SCAN_PARALLELISM);
- return allOptions;
+ public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
+ return String.format(
+ "%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
+ }
+
+ @Internal
+ public static List<ConfigOption<?>> getOptions() {
+ final Field[] fields = FlinkConnectorOptions.class.getFields();
+ final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+ for (Field field : fields) {
+ if (ConfigOption.class.isAssignableFrom(field.getType())) {
+ try {
+ list.add((ConfigOption<?>) field.get(FlinkConnectorOptions.class));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return list;
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index d0d9cccb..fa3cfeb4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -81,7 +81,7 @@ public class FlinkSinkBuilder {
@SuppressWarnings("unchecked")
@Nullable
private Map<String, String> getCompactPartSpec() {
- String json = conf.get(TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC);
+ String json = conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC);
if (json == null) {
return null;
}
@@ -103,7 +103,7 @@ public class FlinkSinkBuilder {
new StoreSink(
tableIdentifier,
table,
- conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED),
+ conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED),
getCompactPartSpec(),
lockFactory,
overwritePartition,
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 0f4e5dac..f2fcb843 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -28,9 +27,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
@@ -45,12 +44,14 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+
/** Table sink to create {@link StoreSink}. */
public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
- private final DynamicTableFactory.Context logStoreContext;
+ private final DynamicTableFactory.Context context;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
private Map<String, String> staticPartitions = new HashMap<>();
@@ -60,11 +61,11 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
public TableStoreSink(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
- DynamicTableFactory.Context logStoreContext,
+ DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableIdentifier = tableIdentifier;
this.table = table;
- this.logStoreContext = logStoreContext;
+ this.context = context;
this.logStoreTableFactory = logStoreTableFactory;
}
@@ -76,12 +77,8 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
// no primary key, sink all changelogs
return requestedMode;
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
- Configuration logOptions =
- new DelegatingConfiguration(
- Configuration.fromMap(table.schema().options()),
- CoreOptions.LOG_PREFIX);
- if (logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
- != CoreOptions.LogChangelogMode.ALL) {
+ Configuration options = Configuration.fromMap(table.schema().options());
+ if (options.get(LOG_CHANGELOG_MODE) != LogChangelogMode.ALL) {
// with primary key, default sink upsert
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
@@ -110,13 +107,13 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
- logSinkProvider = logStoreTableFactory.createSinkProvider(logStoreContext, context);
+ logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
}
Configuration conf = Configuration.fromMap(table.schema().options());
// Do not sink to log store when overwrite mode
final LogSinkFunction logSinkFunction =
- overwrite || conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
+ overwrite || conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED)
? null
: (logSinkProvider == null ? null : logSinkProvider.createSink());
return new TableStoreDataStreamSinkProvider(
@@ -129,15 +126,14 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
.withLockFactory(lockFactory)
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
- .withParallelism(
- conf.get(TableStoreFactoryOptions.SINK_PARALLELISM))
+ .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.build());
}
@Override
public DynamicTableSink copy() {
TableStoreSink copied =
- new TableStoreSink(tableIdentifier, table, logStoreContext, logStoreTableFactory);
+ new TableStoreSink(tableIdentifier, table, context, logStoreTableFactory);
copied.staticPartitions = new HashMap<>(staticPartitions);
copied.overwrite = overwrite;
copied.lockFactory = lockFactory;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 396f6d94..2bdaf7c3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -30,8 +29,8 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.MergeEngine;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -43,6 +42,11 @@ import javax.annotation.Nullable;
import java.util.Optional;
+import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+
/** Source builder to build a Flink {@link Source}. */
public class FlinkSourceBuilder {
@@ -94,7 +98,7 @@ public class FlinkSourceBuilder {
}
private long discoveryIntervalMills() {
- return conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+ return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
}
private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) {
@@ -111,19 +115,16 @@ public class FlinkSourceBuilder {
if (isContinuous) {
// TODO move validation to a dedicated method
if (table.schema().primaryKeys().size() > 0
- && conf.get(CoreOptions.MERGE_ENGINE)
- == CoreOptions.MergeEngine.PARTIAL_UPDATE) {
+ && conf.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) {
throw new ValidationException(
"Partial update continuous reading is not supported.");
}
- CoreOptions.LogStartupMode startupMode =
- new DelegatingConfiguration(conf, CoreOptions.LOG_PREFIX)
- .get(CoreOptions.LOG_SCAN);
+ LogStartupMode startupMode = conf.get(LOG_SCAN);
if (logSourceProvider == null) {
- return buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST);
+ return buildFileSource(true, startupMode == LogStartupMode.LATEST);
} else {
- if (startupMode != CoreOptions.LogStartupMode.FULL) {
+ if (startupMode != LogStartupMode.FULL) {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
@@ -151,7 +152,7 @@ public class FlinkSourceBuilder {
.orElse(rowType);
DataStreamSource<RowData> dataStream =
env.fromSource(
- conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
+ conf.get(COMPACTION_MANUAL_TRIGGERED)
? new FileStoreEmptySource()
: buildSource(),
WatermarkStrategy.noWatermarks(),
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 880c6a9c..c5545658 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -28,9 +27,10 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
-import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
@@ -47,6 +47,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
+import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+
/**
* Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
* For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
@@ -59,7 +62,7 @@ public class TableStoreSource
private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
private final boolean streaming;
- private final DynamicTableFactory.Context logStoreContext;
+ private final DynamicTableFactory.Context context;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
@Nullable private Predicate predicate;
@@ -69,12 +72,12 @@ public class TableStoreSource
ObjectIdentifier tableIdentifier,
FileStoreTable table,
boolean streaming,
- DynamicTableFactory.Context logStoreContext,
+ DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableIdentifier = tableIdentifier;
this.table = table;
this.streaming = streaming;
- this.logStoreContext = logStoreContext;
+ this.context = context;
this.logStoreTableFactory = logStoreTableFactory;
}
@@ -92,14 +95,9 @@ public class TableStoreSource
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
// optimization: transaction consistency and all changelog mode avoid the generation of
// normalized nodes. See TableStoreSink.getChangelogMode validation.
- Configuration logOptions =
- new DelegatingConfiguration(
- Configuration.fromMap(table.schema().options()),
- CoreOptions.LOG_PREFIX);
- return logOptions.get(CoreOptions.LOG_CONSISTENCY)
- == CoreOptions.LogConsistency.TRANSACTIONAL
- && logOptions.get(CoreOptions.LOG_CHANGELOG_MODE)
- == CoreOptions.LogChangelogMode.ALL
+ Configuration options = Configuration.fromMap(table.schema().options());
+ return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
+ && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL
? ChangelogMode.all()
: ChangelogMode.upsert();
} else {
@@ -113,8 +111,7 @@ public class TableStoreSource
LogSourceProvider logSourceProvider = null;
if (logStoreTableFactory != null) {
logSourceProvider =
- logStoreTableFactory.createSourceProvider(
- logStoreContext, scanContext, projectFields);
+ logStoreTableFactory.createSourceProvider(context, scanContext, projectFields);
}
FlinkSourceBuilder sourceBuilder =
@@ -125,7 +122,7 @@ public class TableStoreSource
.withPredicate(predicate)
.withParallelism(
Configuration.fromMap(table.schema().options())
- .get(TableStoreFactoryOptions.SCAN_PARALLELISM));
+ .get(FlinkConnectorOptions.SCAN_PARALLELISM));
return new TableStoreDataStreamScanProvider(
!streaming, env -> sourceBuilder.withEnv(env).build());
@@ -135,7 +132,7 @@ public class TableStoreSource
public DynamicTableSource copy() {
TableStoreSource copied =
new TableStoreSource(
- tableIdentifier, table, streaming, logStoreContext, logStoreTableFactory);
+ tableIdentifier, table, streaming, context, logStoreTableFactory);
copied.predicate = predicate;
copied.projectFields = projectFields;
return copied;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index be277d35..febc784d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
import org.junit.Test;
@@ -29,7 +29,7 @@ import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
@@ -146,8 +146,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
null,
false,
Collections.singletonMap(
- LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
- CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
"rate_by_to_currency IS NULL",
Arrays.asList(
"corrected_rate_by_to_currency",
@@ -191,8 +190,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
- CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Collections.emptyList(), // projection
Collections.singletonList(
@@ -215,8 +213,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
- CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter.replaceAll("ts", "ts1"),
Arrays.asList("currency", "rate", "ts1"),
Collections.singletonList(
@@ -240,8 +237,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
- CoreOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Arrays.asList(
"currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
index 9e3be19f..68703685 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -42,6 +41,7 @@ import java.util.List;
import java.util.UUID;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -79,8 +79,7 @@ public class CreateTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
.isPresent();
// check table store
- assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
- .exists();
+ assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile()).exists();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore);
} else {
@@ -129,7 +128,7 @@ public class CreateTableITCase extends TableStoreTestBase {
}
} else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
// failed when creating file store
- Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile().mkdirs();
+ Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile().mkdirs();
} else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
// failed when creating log store
createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
index e7278976..177d16c4 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -40,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -80,7 +80,7 @@ public class DropTableITCase extends TableStoreTestBase {
assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
.isNotPresent();
// check table store
- assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile())
+ assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile())
.doesNotExist();
// check log store
assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 4e313402..754d1354 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -51,9 +51,9 @@ import java.time.Duration;
import java.util.List;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.CoreOptions.relativeTablePath;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.junit.jupiter.api.Assertions.fail;
/** ITCase for file store table api. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 01e84211..69ee10c2 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -60,8 +60,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase {
String.format(
"CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
+ "'log.system'='kafka', "
- + "'log.kafka.bootstrap.servers'='%s',"
- + "'log.topic'='%s'"
+ + "'kafka.bootstrap.servers'='%s',"
+ + "'kafka.topic'='%s'"
+ ")",
getBootstrapServers(), topic));
innerTestWriteRead();
@@ -85,8 +85,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase {
+ "d AS CAST(c as INT) + 1"
+ ") WITH ("
+ "'log.system'='kafka', "
- + "'log.kafka.bootstrap.servers'='%s',"
- + "'log.topic'='%s'"
+ + "'kafka.bootstrap.servers'='%s',"
+ + "'kafka.topic'='%s'"
+ ")",
getBootstrapServers(), topic));
BlockingIterator<Row, Row> iterator =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 10adddfb..0f4d5ea5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -58,14 +58,14 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SCAN_PARALLELISM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRates;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithUB;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 33d28349..3c9fcb76 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
@@ -49,7 +49,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords;
import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertIntoQuery;
@@ -57,8 +61,6 @@ import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertO
import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSelectQuery;
import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery;
import static org.apache.flink.table.store.connector.ShowCreateUtil.createTableLikeDDL;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,7 +73,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
protected void checkFileStorePath(
StreamTableEnvironment tEnv, String managedTable, @Nullable String partitionList) {
String relativeFilePath =
- CoreOptions.relativeTablePath(
+ relativeTablePath(
ObjectIdentifier.of(
tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
// check snapshot file path
@@ -230,9 +232,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
- hints.put(
- LOG_PREFIX + CoreOptions.LOG_SCAN.key(),
- CoreOptions.LogStartupMode.LATEST.name().toLowerCase());
+ hints.put(LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase());
collectAndCheckUnderSameEnv(
true,
true,
@@ -258,9 +258,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
- hints.put(LOG_PREFIX + CoreOptions.LOG_SCAN.key(), "from-timestamp");
- hints.put(
- LOG_PREFIX + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+ hints.put(LOG_SCAN.key(), "from-timestamp");
+ hints.put(LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
collectAndCheckUnderSameEnv(
true,
true,
@@ -292,7 +291,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
tableOptions.put(ROOT_PATH.key(), rootPath);
if (enableLogStore) {
tableOptions.put(LOG_SYSTEM.key(), "kafka");
- tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+ tableOptions.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
}
String sourceTable = "source_table_" + UUID.randomUUID();
String managedTable = "managed_table_" + UUID.randomUUID();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index 598fb75f..15424e02 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -90,7 +90,7 @@ public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
+ "WITH (\n"
+ " 'path' = '%s',\n"
+ " 'log.system' = 'kafka', "
- + " 'log.kafka.bootstrap.servers' = '%s');",
+ + " 'kafka.bootstrap.servers' = '%s');",
rootPath, getBootstrapServers());
streamTableEnv.executeSql(orderSource);
streamTableEnv.executeSql(cleanedOrders);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 31832f59..843b99f5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -57,13 +57,13 @@ import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.CoreOptions.path;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE;
import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
@@ -122,20 +122,20 @@ public class TableStoreManagedFactoryTest {
Map<String, String> sessionMap = new HashMap<>();
sessionMap.put("table-store.root-path", "my_path");
sessionMap.put("table-store.log.system", "kafka");
- sessionMap.put("table-store.log.topic", "my_topic");
+ sessionMap.put("table-store.kafka.topic", "my_topic");
context = createNonEnrichedContext(sessionMap, emptyMap());
assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context))
.hasMessage(
"Managed table can not contain custom topic. You need to remove topic in table options or session config.");
- sessionMap.remove("table-store.log.topic");
+ sessionMap.remove("table-store.kafka.topic");
context = createNonEnrichedContext(sessionMap, emptyMap());
Map<String, String> enriched = tableStoreManagedFactory.enrichOptions(context);
Map<String, String> expected = new HashMap<>();
expected.put("path", "my_path/catalog.catalog/database.db/table");
expected.put("log.system", "kafka");
- expected.put("log.topic", "catalog.database.table");
+ expected.put("kafka.topic", "catalog.database.table");
assertThat(enriched).containsExactlyEntriesOf(expected);
}
@@ -188,27 +188,6 @@ public class TableStoreManagedFactoryTest {
}
}
- @Test
- public void testFilterLogStoreOptions() {
- // mix invalid key and leave value to empty to emphasize the deferred validation
- Map<String, String> expectedLogOptions =
- of(
- CoreOptions.LOG_SCAN.key(),
- "",
- CoreOptions.LOG_RETENTION.key(),
- "",
- "dummy.key",
- "",
- CoreOptions.LOG_CHANGELOG_MODE.key(),
- "");
- Map<String, String> enrichedOptions =
- addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
- enrichedOptions.put("foo", "bar");
-
- assertThat(TableStoreManagedFactory.filterLogStoreOptions(enrichedOptions))
- .containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
- }
-
@ParameterizedTest
@MethodSource("provideResolvedTable")
public void testCreateAndCheckTableStore(
@@ -322,9 +301,9 @@ public class TableStoreManagedFactoryTest {
BUCKET.defaultValue().toString(),
ROOT_PATH.key(),
sharedTempDir.toString(),
- LOG_PREFIX + BOOTSTRAP_SERVERS.key(),
+ BOOTSTRAP_SERVERS.key(),
"localhost:9092",
- LOG_PREFIX + LOG_CONSISTENCY.key(),
+ LOG_CONSISTENCY.key(),
LOG_CONSISTENCY.defaultValue().name());
// set configuration under session level
@@ -365,8 +344,7 @@ public class TableStoreManagedFactoryTest {
Map<String, String> expected = new HashMap<>(enrichedOptions);
String rootPath = expected.remove(ROOT_PATH.key());
if (rootPath != null) {
- String path =
- rootPath + "/" + TableStoreManagedFactory.relativeTablePath(TABLE_IDENTIFIER);
+ String path = rootPath + "/" + relativeTablePath(TABLE_IDENTIFIER);
expected.put(PATH.key(), path);
}
return expected;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 1f321af9..b3284952 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -50,10 +49,10 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX;
-import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
/** End-to-end test base for table store. */
@@ -111,7 +110,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), rootPath);
configuration.setString(
- TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+ TABLE_STORE_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
if (enableLogStore) {
configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), "kafka");
}
@@ -153,8 +152,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
}
protected void deleteTablePath() {
- FileUtils.deleteQuietly(
- Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile());
+ FileUtils.deleteQuietly(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile());
}
/** Expected result wrapper. */
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 59465a73..1f4cb4e5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -21,7 +21,8 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
@@ -102,10 +103,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
KafkaLogTestUtils.testContext(
name,
getBootstrapServers(),
- CoreOptions.LogChangelogMode.AUTO,
- transaction
- ? CoreOptions.LogConsistency.TRANSACTIONAL
- : CoreOptions.LogConsistency.EVENTUAL,
+ LogChangelogMode.AUTO,
+ transaction ? LogConsistency.TRANSACTIONAL : LogConsistency.EVENTUAL,
TABLE_TYPE,
hasPk ? new int[] {2} : new int[0]);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 4a4be635..16134667 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -156,7 +156,7 @@ public class TestChangelogDataReadWrite {
.createEmptyWriter(partition, bucket, service);
((MemoryOwner) writer)
.setMemoryPool(
- new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+ new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
}
}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
similarity index 58%
copy from flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
index 8cdb6d0c..925a0d13 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
@@ -16,23 +16,30 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.kafka;
+package org.apache.flink.table.store;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
-/** Options for kafka log. */
-public class KafkaLogOptions {
+/** Catalog options for table store. */
+public class CatalogOptions {
+ private CatalogOptions() {}
- public static final ConfigOption<String> BOOTSTRAP_SERVERS =
- ConfigOptions.key("kafka.bootstrap.servers")
+ public static final ConfigOption<String> WAREHOUSE =
+ ConfigOptions.key("warehouse")
.stringType()
.noDefaultValue()
- .withDescription("Required Kafka server connection string");
+ .withDescription("The warehouse root path of catalog.");
- public static final ConfigOption<String> TOPIC =
- ConfigOptions.key("topic")
+ public static final ConfigOption<String> METASTORE =
+ ConfigOptions.key("metastore")
+ .stringType()
+ .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+
+ public static final ConfigOption<String> URI =
+ ConfigOptions.key("uri")
.stringType()
.noDefaultValue()
- .withDescription("Topic of this kafka table.");
+ .withDescription("Uri of metastore server.");
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 1df27218..07557caa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -26,16 +27,16 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.time.Duration;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
@@ -43,8 +44,6 @@ import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
/** Core options for table store. */
public class CoreOptions implements Serializable {
- public static final String LOG_PREFIX = "log.";
- public static final String TABLE_STORE_PREFIX = "table-store.";
public static final ConfigOption<Integer> BUCKET =
ConfigOptions.key("bucket")
@@ -52,6 +51,7 @@ public class CoreOptions implements Serializable {
.defaultValue(1)
.withDescription("Bucket number for file store.");
+ @Internal
public static final ConfigOption<String> PATH =
ConfigOptions.key("path")
.stringType()
@@ -116,18 +116,17 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofSeconds(1))
.withDescription("The discovery interval of continuous reading.");
- public static final ConfigOption<CoreOptions.MergeEngine> MERGE_ENGINE =
+ public static final ConfigOption<MergeEngine> MERGE_ENGINE =
ConfigOptions.key("merge-engine")
- .enumType(CoreOptions.MergeEngine.class)
- .defaultValue(CoreOptions.MergeEngine.DEDUPLICATE)
+ .enumType(MergeEngine.class)
+ .defaultValue(MergeEngine.DEDUPLICATE)
.withDescription(
Description.builder()
.text("Specify the merge engine for table with primary key.")
.linebreak()
.list(
- formatEnumOption(CoreOptions.MergeEngine.DEDUPLICATE),
- formatEnumOption(
- CoreOptions.MergeEngine.PARTIAL_UPDATE))
+ formatEnumOption(MergeEngine.DEDUPLICATE),
+ formatEnumOption(MergeEngine.PARTIAL_UPDATE))
.build());
public static final ConfigOption<WriteMode> WRITE_MODE =
@@ -230,7 +229,7 @@ public class CoreOptions implements Serializable {
+ "it can be read directly during stream reads.");
public static final ConfigOption<LogStartupMode> LOG_SCAN =
- ConfigOptions.key("scan")
+ ConfigOptions.key("log.scan")
.enumType(LogStartupMode.class)
.defaultValue(LogStartupMode.FULL)
.withDescription(
@@ -243,21 +242,21 @@ public class CoreOptions implements Serializable {
.build());
public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
- ConfigOptions.key("scan.timestamp-millis")
+ ConfigOptions.key("log.scan.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"from-timestamp\" scan mode");
public static final ConfigOption<Duration> LOG_RETENTION =
- ConfigOptions.key("retention")
+ ConfigOptions.key("log.retention")
.durationType()
.noDefaultValue()
.withDescription(
"It means how long changes log will be kept. The default value is from the log system cluster.");
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
- ConfigOptions.key("consistency")
+ ConfigOptions.key("log.consistency")
.enumType(LogConsistency.class)
.defaultValue(LogConsistency.TRANSACTIONAL)
.withDescription(
@@ -270,7 +269,7 @@ public class CoreOptions implements Serializable {
.build());
public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
- ConfigOptions.key("changelog-mode")
+ ConfigOptions.key("log.changelog-mode")
.enumType(LogChangelogMode.class)
.defaultValue(LogChangelogMode.AUTO)
.withDescription(
@@ -284,79 +283,27 @@ public class CoreOptions implements Serializable {
.build());
public static final ConfigOption<String> LOG_KEY_FORMAT =
- ConfigOptions.key("key.format")
+ ConfigOptions.key("log.key.format")
.stringType()
.defaultValue("json")
.withDescription(
"Specify the key message format of log system with primary key.");
public static final ConfigOption<String> LOG_FORMAT =
- ConfigOptions.key("format")
+ ConfigOptions.key("log.format")
.stringType()
.defaultValue("debezium-json")
.withDescription("Specify the message format of log system.");
- public long writeBufferSize;
+ private final Configuration options;
- public int pageSize;
-
- public long targetFileSize;
-
- public int numSortedRunCompactionTrigger;
-
- public int numSortedRunStopTrigger;
-
- public int numLevels;
-
- public boolean commitForceCompact;
-
- public int maxSizeAmplificationPercent;
-
- public int sizeRatio;
-
- public boolean enableChangelogFile;
-
- private Configuration options;
-
- public CoreOptions(
- long writeBufferSize,
- int pageSize,
- long targetFileSize,
- int numSortedRunCompactionTrigger,
- int numSortedRunStopTrigger,
- Integer numLevels,
- boolean commitForceCompact,
- int maxSizeAmplificationPercent,
- int sizeRatio,
- boolean enableChangelogFile) {
- this.writeBufferSize = writeBufferSize;
- this.pageSize = pageSize;
- this.targetFileSize = targetFileSize;
- this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
- this.numSortedRunStopTrigger =
- Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
- // By default, this ensures that the compaction does not fall to level 0, but at least to
- // level 1
- this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
- this.commitForceCompact = commitForceCompact;
- this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
- this.sizeRatio = sizeRatio;
- this.enableChangelogFile = enableChangelogFile;
+ public CoreOptions(Map<String, String> options) {
+ this(Configuration.fromMap(options));
}
- public CoreOptions(Configuration config) {
- this(
- config.get(WRITE_BUFFER_SIZE).getBytes(),
- (int) config.get(PAGE_SIZE).getBytes(),
- config.get(TARGET_FILE_SIZE).getBytes(),
- config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
- config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
- config.get(NUM_LEVELS),
- config.get(COMMIT_FORCE_COMPACT),
- config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
- config.get(COMPACTION_SIZE_RATIO),
- config.get(CHANGELOG_FILE));
- this.options = config;
+ public CoreOptions(Configuration options) {
+ this.options = options;
+ // TODO validate all keys
Preconditions.checkArgument(
snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
@@ -367,39 +314,6 @@ public class CoreOptions implements Serializable {
+ SNAPSHOT_NUM_RETAINED_MAX.key());
}
- public CoreOptions(Map<String, String> options) {
- this(Configuration.fromMap(options));
- }
-
- public static Set<ConfigOption<?>> allOptions() {
- Set<ConfigOption<?>> allOptions = new HashSet<>();
- allOptions.add(BUCKET);
- allOptions.add(PATH);
- allOptions.add(FILE_FORMAT);
- allOptions.add(MANIFEST_FORMAT);
- allOptions.add(MANIFEST_TARGET_FILE_SIZE);
- allOptions.add(MANIFEST_MERGE_MIN_COUNT);
- allOptions.add(PARTITION_DEFAULT_NAME);
- allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
- allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
- allOptions.add(SNAPSHOT_TIME_RETAINED);
- allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
- allOptions.add(MERGE_ENGINE);
- allOptions.add(WRITE_MODE);
- allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
- allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
- allOptions.add(WRITE_BUFFER_SIZE);
- allOptions.add(PAGE_SIZE);
- allOptions.add(TARGET_FILE_SIZE);
- allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
- allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
- allOptions.add(NUM_LEVELS);
- allOptions.add(COMMIT_FORCE_COMPACT);
- allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
- allOptions.add(COMPACTION_SIZE_RATIO);
- return allOptions;
- }
-
public int bucket() {
return options.get(BUCKET);
}
@@ -416,14 +330,6 @@ public class CoreOptions implements Serializable {
return new Path(options.get(PATH));
}
- public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
- return String.format(
- "%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName());
- }
-
public FileFormat fileFormat() {
return FileFormat.fromTableOptions(options, FILE_FORMAT);
}
@@ -464,6 +370,50 @@ public class CoreOptions implements Serializable {
return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
}
+ public long writeBufferSize() {
+ return options.get(WRITE_BUFFER_SIZE).getBytes();
+ }
+
+ public int pageSize() {
+ return (int) options.get(PAGE_SIZE).getBytes();
+ }
+
+ public long targetFileSize() {
+ return options.get(TARGET_FILE_SIZE).getBytes();
+ }
+
+ public int numSortedRunCompactionTrigger() {
+ return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
+ }
+
+ public int numSortedRunStopTrigger() {
+ return Math.max(numSortedRunCompactionTrigger(), options.get(NUM_SORTED_RUNS_STOP_TRIGGER));
+ }
+
+ public int numLevels() {
+ // By default, this ensures that the compaction does not fall to level 0, but at least to
+ // level 1
+ Integer numLevels = options.get(NUM_LEVELS);
+ numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels;
+ return numLevels;
+ }
+
+ public boolean commitForceCompact() {
+ return options.get(COMMIT_FORCE_COMPACT);
+ }
+
+ public int maxSizeAmplificationPercent() {
+ return options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+ }
+
+ public int sizeRatio() {
+ return options.get(COMPACTION_SIZE_RATIO);
+ }
+
+ public boolean enableChangelogFile() {
+ return options.get(CHANGELOG_FILE);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
@@ -579,4 +529,20 @@ public class CoreOptions implements Serializable {
return text(description);
}
}
+
+ @Internal
+ public static List<ConfigOption<?>> getOptions() {
+ final Field[] fields = CoreOptions.class.getFields();
+ final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+ for (Field field : fields) {
+ if (ConfigOption.class.isAssignableFrom(field.getType())) {
+ try {
+ list.add((ConfigOption<?>) field.get(CoreOptions.class));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return list;
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 6236a39f..897ab2dc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
pathFactory(),
snapshotManager(),
newScan(true),
- options.targetFileSize);
+ options.targetFileSize());
}
private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
deleted file mode 100644
index 43065064..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DescribedEnum;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.description.Description;
-import org.apache.flink.configuration.description.InlineElement;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.configuration.description.TextElement.text;
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
-/** Options for {@link FileStore}. */
-public class FileStoreOptions implements Serializable {
-
- public static final String TABLE_STORE_PREFIX = "table-store.";
-
- public static final ConfigOption<Integer> BUCKET =
- ConfigOptions.key("bucket")
- .intType()
- .defaultValue(1)
- .withDescription("Bucket number for file store.");
-
- public static final ConfigOption<String> PATH =
- ConfigOptions.key("path")
- .stringType()
- .noDefaultValue()
- .withDescription("The file path of this table in the filesystem.");
-
- public static final ConfigOption<String> FILE_FORMAT =
- ConfigOptions.key("file.format")
- .stringType()
- .defaultValue("orc")
- .withDescription("Specify the message format of data files.");
-
- public static final ConfigOption<String> MANIFEST_FORMAT =
- ConfigOptions.key("manifest.format")
- .stringType()
- .defaultValue("avro")
- .withDescription("Specify the message format of manifest files.");
-
- public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
- ConfigOptions.key("manifest.target-file-size")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(8))
- .withDescription("Suggested file size of a manifest file.");
-
- public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
- ConfigOptions.key("manifest.merge-min-count")
- .intType()
- .defaultValue(30)
- .withDescription(
- "To avoid frequent manifest merges, this parameter specifies the minimum number "
- + "of ManifestFileMeta to merge.");
-
- public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
- key("partition.default-name")
- .stringType()
- .defaultValue("__DEFAULT_PARTITION__")
- .withDescription(
- "The default partition name in case the dynamic partition"
- + " column value is null/empty string.");
-
- public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
- ConfigOptions.key("snapshot.num-retained.min")
- .intType()
- .defaultValue(10)
- .withDescription("The minimum number of completed snapshots to retain.");
-
- public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX =
- ConfigOptions.key("snapshot.num-retained.max")
- .intType()
- .defaultValue(Integer.MAX_VALUE)
- .withDescription("The maximum number of completed snapshots to retain.");
-
- public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
- ConfigOptions.key("snapshot.time-retained")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription("The maximum time of completed snapshots to retain.");
-
- public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
- ConfigOptions.key("continuous.discovery-interval")
- .durationType()
- .defaultValue(Duration.ofSeconds(1))
- .withDescription("The discovery interval of continuous reading.");
-
- public static final ConfigOption<MergeEngine> MERGE_ENGINE =
- ConfigOptions.key("merge-engine")
- .enumType(MergeEngine.class)
- .defaultValue(MergeEngine.DEDUPLICATE)
- .withDescription(
- Description.builder()
- .text("Specify the merge engine for table with primary key.")
- .linebreak()
- .list(
- formatEnumOption(MergeEngine.DEDUPLICATE),
- formatEnumOption(MergeEngine.PARTIAL_UPDATE))
- .build());
-
- public static final ConfigOption<WriteMode> WRITE_MODE =
- ConfigOptions.key("write-mode")
- .enumType(WriteMode.class)
- .defaultValue(WriteMode.CHANGE_LOG)
- .withDescription(
- Description.builder()
- .text("Specify the write mode for table.")
- .linebreak()
- .list(formatEnumOption(WriteMode.APPEND_ONLY))
- .list(formatEnumOption(WriteMode.CHANGE_LOG))
- .build());
-
- public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
- ConfigOptions.key("source.split.target-size")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(128))
- .withDescription("Target size of a source split when scanning a bucket.");
-
- public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
- ConfigOptions.key("source.split.open-file-cost")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(4))
- .withDescription(
- "Open file cost of a source file. It is used to avoid reading"
- + " too many files with a source split, which can be very slow.");
-
- private final Configuration options;
-
- public static Set<ConfigOption<?>> allOptions() {
- Set<ConfigOption<?>> allOptions = new HashSet<>();
- allOptions.add(BUCKET);
- allOptions.add(PATH);
- allOptions.add(FILE_FORMAT);
- allOptions.add(MANIFEST_FORMAT);
- allOptions.add(MANIFEST_TARGET_FILE_SIZE);
- allOptions.add(MANIFEST_MERGE_MIN_COUNT);
- allOptions.add(PARTITION_DEFAULT_NAME);
- allOptions.add(SNAPSHOT_NUM_RETAINED_MIN);
- allOptions.add(SNAPSHOT_NUM_RETAINED_MAX);
- allOptions.add(SNAPSHOT_TIME_RETAINED);
- allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL);
- allOptions.add(MERGE_ENGINE);
- allOptions.add(WRITE_MODE);
- allOptions.add(SOURCE_SPLIT_TARGET_SIZE);
- allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST);
- return allOptions;
- }
-
- public FileStoreOptions(Map<String, String> options) {
- this(Configuration.fromMap(options));
- }
-
- public FileStoreOptions(Configuration options) {
- this.options = options;
- // TODO validate all keys
- Preconditions.checkArgument(
- snapshotNumRetainMin() > 0,
- SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
- Preconditions.checkArgument(
- snapshotNumRetainMin() <= snapshotNumRetainMax(),
- SNAPSHOT_NUM_RETAINED_MIN.key()
- + " should not be larger than "
- + SNAPSHOT_NUM_RETAINED_MAX.key());
- }
-
- public int bucket() {
- return options.get(BUCKET);
- }
-
- public Path path() {
- return path(options.toMap());
- }
-
- public static Path path(Map<String, String> options) {
- return new Path(options.get(PATH.key()));
- }
-
- public static Path path(Configuration options) {
- return new Path(options.get(PATH));
- }
-
- public static String relativeTablePath(ObjectIdentifier tableIdentifier) {
- return String.format(
- "%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName());
- }
-
- public FileFormat fileFormat() {
- return FileFormat.fromTableOptions(options, FILE_FORMAT);
- }
-
- public FileFormat manifestFormat() {
- return FileFormat.fromTableOptions(options, MANIFEST_FORMAT);
- }
-
- public MemorySize manifestTargetSize() {
- return options.get(MANIFEST_TARGET_FILE_SIZE);
- }
-
- public String partitionDefaultName() {
- return options.get(PARTITION_DEFAULT_NAME);
- }
-
- public MergeTreeOptions mergeTreeOptions() {
- return new MergeTreeOptions(options);
- }
-
- public int snapshotNumRetainMin() {
- return options.get(SNAPSHOT_NUM_RETAINED_MIN);
- }
-
- public int snapshotNumRetainMax() {
- return options.get(SNAPSHOT_NUM_RETAINED_MAX);
- }
-
- public Duration snapshotTimeRetain() {
- return options.get(SNAPSHOT_TIME_RETAINED);
- }
-
- public int manifestMergeMinCount() {
- return options.get(MANIFEST_MERGE_MIN_COUNT);
- }
-
- public long splitTargetSize() {
- return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
- }
-
- public long splitOpenFileCost() {
- return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
- }
-
- /** Specifies the merge engine for table with primary key. */
- public enum MergeEngine implements DescribedEnum {
- DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
-
- PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
-
- private final String value;
- private final String description;
-
- MergeEngine(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index fb751a73..9443ddd2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -34,6 +34,8 @@ import java.util.function.Supplier;
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
+ private static final long serialVersionUID = 1L;
+
private final RowType keyType;
private final RowType valueType;
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 403c8be0..608cd266 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.file.catalog;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
@@ -28,6 +26,9 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.CatalogOptions.METASTORE;
+import static org.apache.flink.table.store.CatalogOptions.WAREHOUSE;
+
/** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
public interface CatalogFactory {
@@ -35,17 +36,6 @@ public interface CatalogFactory {
Catalog create(String warehouse, ReadableConfig options);
- ConfigOption<String> METASTORE =
- ConfigOptions.key("metastore")
- .stringType()
- .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
-
- ConfigOption<String> WAREHOUSE =
- ConfigOptions.key("warehouse")
- .stringType()
- .noDefaultValue()
- .withDescription("The warehouse root path of catalog.");
-
static Catalog createCatalog(ReadableConfig options) {
// manual validation
// because different catalog types may have different options
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
deleted file mode 100644
index 3f3bfa07..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.mergetree;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.ReadableConfig;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/** Options for merge tree. */
-public class MergeTreeOptions {
-
- public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
- ConfigOptions.key("write-buffer-size")
- .memoryType()
- .defaultValue(MemorySize.parse("256 mb"))
- .withDescription(
- "Amount of data to build up in memory before converting to a sorted on-disk file.");
-
- public static final ConfigOption<MemorySize> PAGE_SIZE =
- ConfigOptions.key("page-size")
- .memoryType()
- .defaultValue(MemorySize.parse("64 kb"))
- .withDescription("Memory page size.");
-
- public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
- ConfigOptions.key("target-file-size")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(128))
- .withDescription("Target size of a file.");
-
- public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER =
- ConfigOptions.key("num-sorted-run.compaction-trigger")
- .intType()
- .defaultValue(5)
- .withDescription(
- "The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and "
- + "high-level runs (one level one sorted run).");
-
- public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER =
- ConfigOptions.key("num-sorted-run.stop-trigger")
- .intType()
- .defaultValue(10)
- .withDescription(
- "The number of sorted-runs that trigger the stopping of writes.");
-
- public static final ConfigOption<Integer> NUM_LEVELS =
- ConfigOptions.key("num-levels")
- .intType()
- .noDefaultValue()
- .withDescription(
- "Total level number, for example, there are 3 levels, including 0,1,2 levels.");
-
- public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
- ConfigOptions.key("commit.force-compact")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to force a compaction before commit.");
-
- public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
- ConfigOptions.key("compaction.max-size-amplification-percent")
- .intType()
- .defaultValue(200)
- .withDescription(
- "The size amplification is defined as the amount (in percentage) of additional storage "
- + "needed to store a single byte of data in the merge tree.");
-
- public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
- ConfigOptions.key("compaction.size-ratio")
- .intType()
- .defaultValue(1)
- .withDescription(
- "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) "
- + "size is 1% smaller than the next sorted run's size, then include next sorted run "
- + "into this candidate set.");
-
- public static final ConfigOption<Boolean> CHANGELOG_FILE =
- ConfigOptions.key("changelog-file")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to double write to a changelog file when flushing memory table. "
- + "This changelog file keeps the order of data input and the details of data changes, "
- + "it can be read directly during stream reads.");
-
- public final long writeBufferSize;
-
- public final int pageSize;
-
- public final long targetFileSize;
-
- public final int numSortedRunCompactionTrigger;
-
- public final int numSortedRunStopTrigger;
-
- public final int numLevels;
-
- public final boolean commitForceCompact;
-
- public final int maxSizeAmplificationPercent;
-
- public final int sizeRatio;
-
- public final boolean enableChangelogFile;
-
- public MergeTreeOptions(
- long writeBufferSize,
- int pageSize,
- long targetFileSize,
- int numSortedRunCompactionTrigger,
- int numSortedRunStopTrigger,
- Integer numLevels,
- boolean commitForceCompact,
- int maxSizeAmplificationPercent,
- int sizeRatio,
- boolean enableChangelogFile) {
- this.writeBufferSize = writeBufferSize;
- this.pageSize = pageSize;
- this.targetFileSize = targetFileSize;
- this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger;
- this.numSortedRunStopTrigger =
- Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger);
- // By default, this ensures that the compaction does not fall to level 0, but at least to
- // level 1
- this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels;
- this.commitForceCompact = commitForceCompact;
- this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
- this.sizeRatio = sizeRatio;
- this.enableChangelogFile = enableChangelogFile;
- }
-
- public MergeTreeOptions(ReadableConfig config) {
- this(
- config.get(WRITE_BUFFER_SIZE).getBytes(),
- (int) config.get(PAGE_SIZE).getBytes(),
- config.get(TARGET_FILE_SIZE).getBytes(),
- config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
- config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
- config.get(NUM_LEVELS),
- config.get(COMMIT_FORCE_COMPACT),
- config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
- config.get(COMPACTION_SIZE_RATIO),
- config.get(CHANGELOG_FILE));
- }
-
- public static Set<ConfigOption<?>> allOptions() {
- Set<ConfigOption<?>> allOptions = new HashSet<>();
- allOptions.add(WRITE_BUFFER_SIZE);
- allOptions.add(PAGE_SIZE);
- allOptions.add(TARGET_FILE_SIZE);
- allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
- allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER);
- allOptions.add(NUM_LEVELS);
- allOptions.add(COMMIT_FORCE_COMPACT);
- allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
- allOptions.add(COMPACTION_SIZE_RATIO);
- return allOptions;
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index b06df389..09cd4604 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -85,7 +85,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
valueType,
fileFormat,
pathFactory,
- options.targetFileSize);
+ options.targetFileSize());
this.keyComparatorSupplier = keyComparatorSupplier;
this.mergeFunction = mergeFunction;
this.options = options;
@@ -112,10 +112,10 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
}
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator);
- Levels levels = new Levels(keyComparator, compactFiles, options.numLevels);
+ Levels levels = new Levels(keyComparator, compactFiles, options.numLevels());
CompactUnit unit =
CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns());
- return new CompactTask(keyComparator, options.targetFileSize, rewriter, unit, true);
+ return new CompactTask(keyComparator, options.targetFileSize(), rewriter, unit, true);
}
private MergeTreeWriter createMergeTreeWriter(
@@ -132,18 +132,18 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
partition,
bucket,
new UniversalCompaction(
- options.maxSizeAmplificationPercent,
- options.sizeRatio,
- options.numSortedRunCompactionTrigger),
+ options.maxSizeAmplificationPercent(),
+ options.sizeRatio(),
+ options.numSortedRunCompactionTrigger()),
compactExecutor),
- new Levels(keyComparator, restoreFiles, options.numLevels),
+ new Levels(keyComparator, restoreFiles, options.numLevels()),
getMaxSequenceNumber(restoreFiles),
keyComparator,
mergeFunction.copy(),
dataFileWriter,
- options.commitForceCompact,
- options.numSortedRunStopTrigger,
- options.enableChangelogFile);
+ options.commitForceCompact(),
+ options.numSortedRunStopTrigger(),
+ options.enableChangelogFile());
}
private CompactManager createCompactManager(
@@ -154,7 +154,11 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator);
return new CompactManager(
- compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
+ compactExecutor,
+ compactStrategy,
+ keyComparator,
+ options.targetFileSize(),
+ rewriter);
}
private CompactRewriter compactRewriter(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
deleted file mode 100644
index f975233d..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.log;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.DescribedEnum;
-import org.apache.flink.configuration.description.Description;
-import org.apache.flink.configuration.description.InlineElement;
-
-import java.time.Duration;
-
-import static org.apache.flink.configuration.description.TextElement.text;
-import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
-
-/** Options for log store. */
-public class LogOptions {
-
- public static final String LOG_PREFIX = "log.";
-
- public static final ConfigOption<LogStartupMode> SCAN =
- ConfigOptions.key("scan")
- .enumType(LogStartupMode.class)
- .defaultValue(LogStartupMode.FULL)
- .withDescription(
- Description.builder()
- .text("Specify the startup mode for log consumer.")
- .linebreak()
- .list(formatEnumOption(LogStartupMode.FULL))
- .list(formatEnumOption(LogStartupMode.LATEST))
- .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
- .build());
-
- public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLS =
- ConfigOptions.key("scan.timestamp-millis")
- .longType()
- .noDefaultValue()
- .withDescription(
- "Optional timestamp used in case of \"from-timestamp\" scan mode");
-
- public static final ConfigOption<Duration> RETENTION =
- ConfigOptions.key("retention")
- .durationType()
- .noDefaultValue()
- .withDescription(
- "It means how long changes log will be kept. The default value is from the log system cluster.");
-
- public static final ConfigOption<LogConsistency> CONSISTENCY =
- ConfigOptions.key("consistency")
- .enumType(LogConsistency.class)
- .defaultValue(LogConsistency.TRANSACTIONAL)
- .withDescription(
- Description.builder()
- .text("Specify the log consistency mode for table.")
- .linebreak()
- .list(
- formatEnumOption(LogConsistency.TRANSACTIONAL),
- formatEnumOption(LogConsistency.EVENTUAL))
- .build());
-
- public static final ConfigOption<LogChangelogMode> CHANGELOG_MODE =
- ConfigOptions.key("changelog-mode")
- .enumType(LogChangelogMode.class)
- .defaultValue(LogChangelogMode.AUTO)
- .withDescription(
- Description.builder()
- .text("Specify the log changelog mode for table.")
- .linebreak()
- .list(
- formatEnumOption(LogChangelogMode.AUTO),
- formatEnumOption(LogChangelogMode.ALL),
- formatEnumOption(LogChangelogMode.UPSERT))
- .build());
-
- public static final ConfigOption<String> KEY_FORMAT =
- ConfigOptions.key("key.format")
- .stringType()
- .defaultValue("json")
- .withDescription(
- "Specify the key message format of log system with primary key.");
-
- public static final ConfigOption<String> FORMAT =
- ConfigOptions.key("format")
- .stringType()
- .defaultValue("debezium-json")
- .withDescription("Specify the message format of log system.");
-
- /** Specifies the startup mode for log consumer. */
- public enum LogStartupMode implements DescribedEnum {
- FULL(
- "full",
- "Perform a snapshot on the table upon first startup,"
- + " and continue to read the latest changes."),
-
- LATEST("latest", "Start from the latest."),
-
- FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
-
- private final String value;
- private final String description;
-
- LogStartupMode(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-
- /** Specifies the log consistency mode for table. */
- public enum LogConsistency implements DescribedEnum {
- TRANSACTIONAL(
- "transactional",
- "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
-
- EVENTUAL(
- "eventual",
- "Immediate data visibility, you may see some intermediate states, "
- + "but eventually the right results will be produced, only works for table with primary key.");
-
- private final String value;
- private final String description;
-
- LogConsistency(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-
- /** Specifies the log changelog mode for table. */
- public enum LogChangelogMode implements DescribedEnum {
- AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
-
- ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
-
- UPSERT(
- "upsert",
- "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
- + " will automatically add the normalized node, relying on the state"
- + " to generate the required update_before.");
-
- private final String value;
- private final String description;
-
- LogChangelogMode(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 4ae4db06..0df149ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -34,13 +34,15 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.factories.SerializationFormatFactory;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
import java.util.Map;
+import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
+import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
+
/**
* Base interface for configuring a default log table connector. The log table is used by managed
* table factory.
@@ -83,36 +85,32 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
TableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
- helper.discoverDecodingFormat(
- DeserializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
- validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
+ helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT));
return format;
}
static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
TableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
- helper.discoverEncodingFormat(
- SerializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT);
- validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT));
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT));
return format;
}
static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
TableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
- helper.discoverDecodingFormat(
- DeserializationFormatFactory.class, CoreOptions.LOG_FORMAT);
- validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
+ helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_FORMAT);
+ validateValueFormat(format, helper.getOptions().get(LOG_FORMAT));
return format;
}
static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
TableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
- helper.discoverEncodingFormat(
- SerializationFormatFactory.class, CoreOptions.LOG_FORMAT);
- validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT));
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_FORMAT);
+ validateValueFormat(format, helper.getOptions().get(LOG_FORMAT));
return format;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
index 1e997477..1f49b1b7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -42,7 +42,7 @@ public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
super(write, recordConverter);
HeapMemorySegmentPool memoryPool =
- new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize);
+ new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize());
this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 2de27d14..eecb0f30 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -122,7 +122,7 @@ public class MergeTreeTest {
valueType,
flushingAvro,
pathFactory,
- options.targetFileSize)
+ options.targetFileSize())
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
writer = createMergeTreeWriter(Collections.emptyList());
}
@@ -271,15 +271,16 @@ public class MergeTreeTest {
dataFileWriter.keyType(),
dataFileWriter.valueType(),
createCompactManager(dataFileWriter, service),
- new Levels(comparator, files, options.numLevels),
+ new Levels(comparator, files, options.numLevels()),
maxSequenceNumber,
comparator,
new DeduplicateMergeFunction(),
dataFileWriter,
- options.commitForceCompact,
- options.numSortedRunStopTrigger,
+ options.commitForceCompact(),
+ options.numSortedRunStopTrigger(),
false);
- writer.setMemoryPool(new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+ writer.setMemoryPool(
+ new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
}
@@ -287,9 +288,9 @@ public class MergeTreeTest {
DataFileWriter dataFileWriter, ExecutorService compactExecutor) {
CompactStrategy compactStrategy =
new UniversalCompaction(
- options.maxSizeAmplificationPercent,
- options.sizeRatio,
- options.numSortedRunCompactionTrigger);
+ options.maxSizeAmplificationPercent(),
+ options.sizeRatio(),
+ options.numSortedRunCompactionTrigger());
CompactRewriter rewriter =
(outputLevel, dropDelete, sections) ->
dataFileWriter.write(
@@ -302,7 +303,7 @@ public class MergeTreeTest {
new DeduplicateMergeFunction())),
outputLevel);
return new CompactManager(
- compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter);
+ compactExecutor, compactStrategy, comparator, options.targetFileSize(), rewriter);
}
private void mergeCompacted(
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
index da385956..487e3198 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
@@ -74,7 +74,7 @@ public class LogStoreE2eTest extends E2eTestBase {
+ " 'root-path' = '%s',\n"
+ " 'log.consistency' = 'eventual',\n"
+ " 'log.system' = 'kafka',\n"
- + " 'log.kafka.bootstrap.servers' = '%s'\n"
+ + " 'kafka.bootstrap.servers' = '%s'\n"
+ ");";
tableStoreStreamDdl =
String.format(
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index 1e2cbabb..ec0dc2d1 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,24 +18,18 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.util.Preconditions;
+import static org.apache.flink.table.store.CatalogOptions.URI;
+
/** Factory to create {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {
private static final String IDENTIFIER = "hive";
- private static final ConfigOption<String> URI =
- ConfigOptions.key("uri")
- .stringType()
- .noDefaultValue()
- .withDescription("Uri of Hive metastore's thrift server.");
-
@Override
public String identifier() {
return IDENTIFIER;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
index 8cdb6d0c..4b4527b8 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -31,7 +31,7 @@ public class KafkaLogOptions {
.withDescription("Required Kafka server connection string");
public static final ConfigOption<String> TOPIC =
- ConfigOptions.key("topic")
+ ConfigOptions.key("kafka.topic")
.stringType()
.noDefaultValue()
.withDescription("Topic of this kafka table.");
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 85870e9f..d05a06cc 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -57,8 +57,6 @@ import java.util.concurrent.ExecutionException;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT;
-import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT;
import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
@@ -88,16 +86,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(LOG_SCAN);
- options.add(TOPIC);
- options.add(LOG_SCAN_TIMESTAMP_MILLS);
- options.add(LOG_RETENTION);
- options.add(LOG_CONSISTENCY);
- options.add(LOG_CHANGELOG_MODE);
- options.add(LOG_KEY_FORMAT);
- options.add(LOG_FORMAT);
- return options;
+ return new HashSet<>();
}
@Override
@@ -119,12 +108,10 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
@Override
public void onCreateTable(Context context, int numBucket, boolean ignoreIfExists) {
- FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
- helper.validateExcept(KAFKA_PREFIX);
- try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
+ Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
+ try (AdminClient adminClient = AdminClient.create(toKafkaProperties(options))) {
Map<String, String> configs = new HashMap<>();
- helper.getOptions()
- .getOptional(LOG_RETENTION)
+ options.getOptional(LOG_RETENTION)
.ifPresent(
retention ->
configs.put(
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 343525e6..8c82a433 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
+import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.types.DataType;
@@ -56,8 +58,6 @@ import java.util.stream.IntStream;
import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LogChangelogMode;
-import static org.apache.flink.table.store.CoreOptions.LogConsistency;
import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;