You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/18 04:43:12 UTC
[incubator-paimon] 31/32: [core] Rename paimon: TableStore to Paimon 2
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 7fdfc4539417ec7b97bcda97de92a52b457001c7
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sat Mar 18 10:18:27 2023 +0800
[core] Rename paimon: TableStore to Paimon 2
---
.../java/org/apache/paimon/utils/JsonSerdeUtil.java | 4 ++--
...rovider.java => PaimonDataStreamScanProvider.java} | 4 ++--
...rovider.java => PaimonDataStreamSinkProvider.java} | 5 ++---
...oreFactory.java => AbstractFlinkTableFactory.java} | 10 +++++-----
.../java/org/apache/paimon/flink/FlinkCatalog.java | 2 +-
...reConnectorFactory.java => FlinkTableFactory.java} | 10 +++++-----
...rovider.java => PaimonDataStreamScanProvider.java} | 4 ++--
...rovider.java => PaimonDataStreamSinkProvider.java} | 5 ++---
.../org/apache/paimon/flink/action/ActionBase.java | 2 +-
.../apache/paimon/flink/action/MergeIntoAction.java | 3 +--
.../sink/{TableStoreSink.java => FlinkTableSink.java} | 14 +++++++-------
.../{TableStoreSource.java => DataTableSource.java} | 16 ++++++++--------
.../apache/paimon/flink/source/SystemTableSource.java | 2 +-
.../services/org.apache.flink.table.factories.Factory | 2 +-
...ryTest.java => AbstractFlinkTableFactoryTest.java} | 6 +++---
.../org/apache/paimon/flink/ChangelogModeTest.java | 10 +++++-----
.../org/apache/paimon/flink/CreateTableITCase.java | 2 +-
.../java/org/apache/paimon/flink/DropTableITCase.java | 2 +-
.../{TableStoreTestBase.java => FlinkTestBase.java} | 2 +-
.../org/apache/paimon/flink/ReadWriteTableITCase.java | 12 ++++++------
.../main/java/org/apache/paimon/hive/HiveCatalog.java | 6 +++---
.../java/org/apache/paimon/hive/PaimonMetaHook.java | 8 ++++----
.../main/java/org/apache/paimon/hive/PaimonSerDe.java | 2 +-
.../org/apache/paimon/hive/PaimonStorageHandler.java | 8 ++++----
...leStoreInputFormat.java => PaimonInputFormat.java} | 10 +++++-----
...ableStoreInputSplit.java => PaimonInputSplit.java} | 8 ++++----
...StoreOutputFormat.java => PaimonOutputFormat.java} | 2 +-
...StoreRecordReader.java => PaimonRecordReader.java} | 6 +++---
.../objectinspector/PaimonRowDataObjectInspector.java | 14 +++++++-------
.../paimon/hive/PaimonStorageHandlerITCase.java | 6 ++----
...eInputSplitTest.java => PaimonInputSplitTest.java} | 10 +++++-----
...ordReaderTest.java => PaimonRecordReaderTest.java} | 19 +++++++++----------
32 files changed, 105 insertions(+), 111 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index 4a61dde66..dab9d1692 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -49,7 +49,7 @@ public class JsonSerdeUtil {
static {
OBJECT_MAPPER_INSTANCE = new ObjectMapper();
- OBJECT_MAPPER_INSTANCE.registerModule(createTableStoreJacksonModule());
+ OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule());
}
public static <T> T fromJson(String json, Class<T> clazz) {
@@ -76,7 +76,7 @@ public class JsonSerdeUtil {
}
}
- private static Module createTableStoreJacksonModule() {
+ private static Module createPaimonJacksonModule() {
SimpleModule module = new SimpleModule("Paimon");
registerJsonObjects(
module, TableSchema.class, SchemaSerializer.INSTANCE, SchemaSerializer.INSTANCE);
diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
similarity index 93%
rename from paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java
rename to paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
index a742f7f02..b7bea8e17 100644
--- a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java
+++ b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
@@ -26,12 +26,12 @@ import org.apache.flink.table.data.RowData;
import java.util.function.Function;
/** Paimon {@link DataStreamScanProvider}. */
-public class TableStoreDataStreamScanProvider implements DataStreamScanProvider {
+public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
private final boolean isBounded;
private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer;
- public TableStoreDataStreamScanProvider(
+ public PaimonDataStreamScanProvider(
boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) {
this.isBounded = isBounded;
this.producer = producer;
diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
similarity index 87%
rename from paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java
rename to paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
index 6d72489be..34aa8991d 100644
--- a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java
+++ b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
@@ -26,12 +26,11 @@ import org.apache.flink.table.data.RowData;
import java.util.function.Function;
/** Paimon {@link DataStreamSinkProvider}. */
-public class TableStoreDataStreamSinkProvider implements DataStreamSinkProvider {
+public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
- public TableStoreDataStreamSinkProvider(
- Function<DataStream<RowData>, DataStreamSink<?>> producer) {
+ public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
this.producer = producer;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
similarity index 97%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 1624de1a3..763f77ff5 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -35,9 +35,9 @@ import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.flink.sink.TableStoreSink;
+import org.apache.paimon.flink.sink.FlinkTableSink;
+import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
-import org.apache.paimon.flink.source.TableStoreSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
@@ -60,7 +60,7 @@ import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Abstract paimon factory to create table source and table sink. */
-public abstract class AbstractTableStoreFactory
+public abstract class AbstractFlinkTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
@@ -72,7 +72,7 @@ public abstract class AbstractTableStoreFactory
if (origin instanceof SystemCatalogTable) {
return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode);
} else {
- return new TableStoreSource(
+ return new DataTableSource(
context.getObjectIdentifier(),
buildFileStoreTable(context),
isStreamingMode,
@@ -83,7 +83,7 @@ public abstract class AbstractTableStoreFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
- return new TableStoreSink(
+ return new FlinkTableSink(
context.getObjectIdentifier(),
buildFileStoreTable(context),
context,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index f73317c55..1d42d3c24 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -88,7 +88,7 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public Optional<Factory> getFactory() {
- return Optional.of(new TableStoreConnectorFactory(catalog.lockFactory().orElse(null)));
+ return Optional.of(new FlinkTableFactory(catalog.lockFactory().orElse(null)));
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
similarity index 92%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
index 3d70000f0..d2387e0cc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.flink.sink.TableStoreSink;
+import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
@@ -38,15 +38,15 @@ import static org.apache.paimon.CoreOptions.AUTO_CREATE;
import static org.apache.paimon.flink.FlinkCatalogFactory.IDENTIFIER;
/** A paimon {@link DynamicTableFactory} to create source and sink. */
-public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
+public class FlinkTableFactory extends AbstractFlinkTableFactory {
@Nullable private final CatalogLock.Factory lockFactory;
- public TableStoreConnectorFactory() {
+ public FlinkTableFactory() {
this(null);
}
- public TableStoreConnectorFactory(@Nullable CatalogLock.Factory lockFactory) {
+ public FlinkTableFactory(@Nullable CatalogLock.Factory lockFactory) {
this.lockFactory = lockFactory;
}
@@ -84,7 +84,7 @@ public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
context.isTemporary());
}
createTableIfNeeded(context);
- TableStoreSink sink = (TableStoreSink) super.createDynamicTableSink(context);
+ FlinkTableSink sink = (FlinkTableSink) super.createDynamicTableSink(context);
sink.setLockFactory(lockFactory);
return sink;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
similarity index 93%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
index 7ab932352..af9df5b93 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
@@ -27,12 +27,12 @@ import org.apache.flink.table.data.RowData;
import java.util.function.Function;
/** Paimon {@link DataStreamScanProvider}. */
-public class TableStoreDataStreamScanProvider implements DataStreamScanProvider {
+public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
private final boolean isBounded;
private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer;
- public TableStoreDataStreamScanProvider(
+ public PaimonDataStreamScanProvider(
boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) {
this.isBounded = isBounded;
this.producer = producer;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
similarity index 88%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
index fa4c6da7b..05eaacf5a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
@@ -27,12 +27,11 @@ import org.apache.flink.table.data.RowData;
import java.util.function.Function;
/** Paimon {@link DataStreamSinkProvider}. */
-public class TableStoreDataStreamSinkProvider implements DataStreamSinkProvider {
+public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
- public TableStoreDataStreamSinkProvider(
- Function<DataStream<RowData>, DataStreamSink<?>> producer) {
+ public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) {
this.producer = producer;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 7d5e935f9..8ea16956e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -105,7 +105,7 @@ public abstract class ActionBase implements Action {
* Extract {@link LogicalType}s from Flink {@link org.apache.flink.table.types.DataType}s and
* convert to Paimon {@link DataType}s.
*/
- protected List<DataType> toTableStoreDataTypes(
+ protected List<DataType> toPaimonTypes(
List<org.apache.flink.table.types.DataType> flinkDataTypes) {
return flinkDataTypes.stream()
.map(org.apache.flink.table.types.DataType::getLogicalType)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 7870702b0..9fee82607 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -667,8 +667,7 @@ public class MergeIntoAction extends ActionBase {
}
private void checkSchema(String action, Table source) {
- List<DataType> actualTypes =
- toTableStoreDataTypes(source.getResolvedSchema().getColumnDataTypes());
+ List<DataType> actualTypes = toPaimonTypes(source.getResolvedSchema().getColumnDataTypes());
List<DataType> expectedTypes = this.table.rowType().getFieldTypes();
if (!compatibleCheck(actualTypes, expectedTypes)) {
throw new IllegalStateException(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
similarity index 95%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 9dcbe349b..1b4838c41 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -33,7 +33,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.TableStoreDataStreamSinkProvider;
+import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.operation.Lock;
@@ -53,7 +53,7 @@ import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
/** Table sink to create sink. */
-public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
+public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
private final FileStoreTable table;
@@ -64,7 +64,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
private boolean overwrite = false;
@Nullable private CatalogLock.Factory lockFactory;
- public TableStoreSink(
+ public FlinkTableSink(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
DynamicTableFactory.Context context,
@@ -128,7 +128,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
// Do not sink to log store when overwrite mode
final LogSinkFunction logSinkFunction =
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
- return new TableStoreDataStreamSinkProvider(
+ return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkSinkBuilder(table)
.withInput(
@@ -148,8 +148,8 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
@Override
public DynamicTableSink copy() {
- TableStoreSink copied =
- new TableStoreSink(tableIdentifier, table, context, logStoreTableFactory);
+ FlinkTableSink copied =
+ new FlinkTableSink(tableIdentifier, table, context, logStoreTableFactory);
copied.staticPartitions = new HashMap<>(staticPartitions);
copied.overwrite = overwrite;
copied.lockFactory = lockFactory;
@@ -158,7 +158,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
@Override
public String asSummaryString() {
- return "TableStoreSink";
+ return "PaimonSink";
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
similarity index 95%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 1e7b0a6f9..390942e36 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -31,7 +31,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.TableStoreDataStreamScanProvider;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
@@ -60,7 +60,7 @@ import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
* org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link StaticFileStoreSource} and
* kafka log source created by {@link LogSourceProvider}.
*/
-public class TableStoreSource extends FlinkTableSource
+public class DataTableSource extends FlinkTableSource
implements LookupTableSource, SupportsWatermarkPushDown {
private final ObjectIdentifier tableIdentifier;
@@ -71,7 +71,7 @@ public class TableStoreSource extends FlinkTableSource
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
- public TableStoreSource(
+ public DataTableSource(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
boolean streaming,
@@ -89,7 +89,7 @@ public class TableStoreSource extends FlinkTableSource
null);
}
- private TableStoreSource(
+ private DataTableSource(
ObjectIdentifier tableIdentifier,
FileStoreTable table,
boolean streaming,
@@ -135,7 +135,7 @@ public class TableStoreSource extends FlinkTableSource
}
// optimization: transaction consistency and all changelog mode avoid the generation of
- // normalized nodes. See TableStoreSink.getChangelogMode validation.
+ // normalized nodes. See FlinkTableSink.getChangelogMode validation.
return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
&& options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL
? ChangelogMode.all()
@@ -168,13 +168,13 @@ public class TableStoreSource extends FlinkTableSource
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
.withWatermarkStrategy(watermarkStrategy);
- return new TableStoreDataStreamScanProvider(
+ return new PaimonDataStreamScanProvider(
!streaming, env -> sourceBuilder.withEnv(env).build());
}
@Override
public DynamicTableSource copy() {
- return new TableStoreSource(
+ return new DataTableSource(
tableIdentifier,
table,
streaming,
@@ -188,7 +188,7 @@ public class TableStoreSource extends FlinkTableSource
@Override
public String asSummaryString() {
- return "TableStore-DataSource";
+ return "Paimon-DataSource";
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index 94e2d968d..dda054fed 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -86,6 +86,6 @@ public class SystemTableSource extends FlinkTableSource {
@Override
public String asSummaryString() {
- return "TableStore-SystemTable-Source";
+ return "Paimon-SystemTable-Source";
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 0502da679..b4522f01a 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,6 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.flink.TableStoreConnectorFactory
+org.apache.paimon.flink.FlinkTableFactory
org.apache.paimon.flink.FlinkCatalogFactory
org.apache.paimon.flink.kafka.KafkaLogStoreFactory
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
similarity index 93%
rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java
rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index d88fd5b59..332c1bb9b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -29,8 +29,8 @@ import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link AbstractTableStoreFactory}. */
-public class AbstractTableStoreFactoryTest {
+/** Test for {@link AbstractFlinkTableFactory}. */
+public class AbstractFlinkTableFactoryTest {
@Test
public void testSchemaEquals() {
@@ -55,6 +55,6 @@ public class AbstractTableStoreFactoryTest {
}
private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
- assertThat(AbstractTableStoreFactory.schemaEquals(r1, r2)).isEqualTo(expectEquals);
+ assertThat(AbstractFlinkTableFactory.schemaEquals(r1, r2)).isEqualTo(expectEquals);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
index 3b8e255df..8f35a145f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.flink.sink.TableStoreSink;
-import org.apache.paimon.flink.source.TableStoreSource;
+import org.apache.paimon.flink.sink.FlinkTableSink;
+import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
@@ -80,11 +80,11 @@ public class ChangelogModeTest {
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
- TableStoreSource source =
- new TableStoreSource(identifier, table, true, null, logStoreTableFactory);
+ DataTableSource source =
+ new DataTableSource(identifier, table, true, null, logStoreTableFactory);
assertThat(source.getChangelogMode()).isEqualTo(expectSource);
- TableStoreSink sink = new TableStoreSink(identifier, table, null, null);
+ FlinkTableSink sink = new FlinkTableSink(identifier, table, null, null);
assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(expectSink);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java
index 909905bb4..2050d20c3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java
@@ -37,7 +37,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** IT cases for testing create managed table ddl. */
-public class CreateTableITCase extends TableStoreTestBase {
+public class CreateTableITCase extends FlinkTestBase {
@Override
public void prepareEnv(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java
index 550d25a1b..44fbbe2d2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java
@@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** IT cases for testing drop managed table ddl. */
-public class DropTableITCase extends TableStoreTestBase {
+public class DropTableITCase extends FlinkTestBase {
@Override
public void prepareEnv(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
similarity index 99%
rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java
rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
index 14426266e..0769dcfd9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
/** End-to-end test base for paimon. */
-public abstract class TableStoreTestBase extends AbstractTestBase {
+public abstract class FlinkTestBase extends AbstractTestBase {
public static final String CURRENT_CATALOG = "catalog";
public static final String CURRENT_DATABASE = "default";
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 1d823b378..df70ec588 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -39,7 +39,7 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.TableStoreSink;
+import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -57,10 +57,10 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.paimon.flink.AbstractTableStoreFactory.buildFileStoreTable;
+import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildFileStoreTable;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
-import static org.apache.paimon.flink.TableStoreTestBase.createResolvedTable;
+import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bExeEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
@@ -1154,9 +1154,9 @@ public class ReadWriteTableITCase extends AbstractTestBase {
.createTable(FlinkCatalog.fromCatalogTable(context.getCatalogTable()));
DynamicTableSink tableSink =
- new TableStoreSink(
+ new FlinkTableSink(
context.getObjectIdentifier(), buildFileStoreTable(context), context, null);
- assertThat(tableSink).isInstanceOf(TableStoreSink.class);
+ assertThat(tableSink).isInstanceOf(FlinkTableSink.class);
// 2. get sink provider
DynamicTableSink.SinkRuntimeProvider provider =
@@ -1169,7 +1169,7 @@ public class ReadWriteTableITCase extends AbstractTestBase {
bExeEnv.fromCollection(Collections.singletonList(GenericRowData.of()));
DataStreamSink<?> sink = sinkProvider.consumeDataStream(null, mockSource);
Transformation<?> transformation = sink.getTransformation();
- // until a PartitionTransformation, see TableStore.SinkBuilder.build()
+ // until a PartitionTransformation, see FlinkSinkBuilder.build()
while (!(transformation instanceof PartitionTransformation)) {
assertThat(transformation.getParallelism()).isIn(1, expectedParallelism);
transformation = transformation.getInputs().get(0);
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 9ff6c3d39..286d9908d 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -72,9 +72,9 @@ public class HiveCatalog extends AbstractCatalog {
// we don't include paimon-hive-connector as dependencies because it depends on
// hive-exec
private static final String INPUT_FORMAT_CLASS_NAME =
- "org.apache.paimon.hive.mapred.TableStoreInputFormat";
+ "org.apache.paimon.hive.mapred.PaimonInputFormat";
private static final String OUTPUT_FORMAT_CLASS_NAME =
- "org.apache.paimon.hive.mapred.TableStoreOutputFormat";
+ "org.apache.paimon.hive.mapred.PaimonOutputFormat";
private static final String SERDE_CLASS_NAME = "org.apache.paimon.hive.PaimonSerDe";
private static final String STORAGE_HANDLER_CLASS_NAME =
"org.apache.paimon.hive.PaimonStorageHandler";
@@ -162,7 +162,7 @@ public class HiveCatalog extends AbstractCatalog {
.filter(
tableName -> {
Identifier identifier = new Identifier(databaseName, tableName);
- // the environment here may not be able to access non-TableStore
+ // the environment here may not be able to access non-paimon
// tables.
// so we just check the schema file first
return schemaFileExists(identifier)
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index e27292382..1b8c8baa2 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -21,8 +21,8 @@ package org.apache.paimon.hive;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.paimon.hive.mapred.TableStoreInputFormat;
-import org.apache.paimon.hive.mapred.TableStoreOutputFormat;
+import org.apache.paimon.hive.mapred.PaimonInputFormat;
+import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import org.apache.paimon.utils.Preconditions;
/**
@@ -39,8 +39,8 @@ public class PaimonMetaHook implements HiveMetaHook {
+ "with PARTITIONED BY clause. If you want to query from a partitioned table, "
+ "please add partition columns into the ordinary table columns.");
- table.getSd().setInputFormat(TableStoreInputFormat.class.getCanonicalName());
- table.getSd().setOutputFormat(TableStoreOutputFormat.class.getCanonicalName());
+ table.getSd().setInputFormat(PaimonInputFormat.class.getCanonicalName());
+ table.getSd().setOutputFormat(PaimonOutputFormat.class.getCanonicalName());
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
index 2e4297352..5e2d45745 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
@@ -56,7 +56,7 @@ public class PaimonSerDe extends AbstractSerDe {
@Override
public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
throw new UnsupportedOperationException(
- "TableStoreSerDe currently only supports deserialization.");
+ "PaimonSerDe currently only supports deserialization.");
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 37cf0cf4e..1ee77bc6e 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.paimon.hive.mapred.TableStoreInputFormat;
-import org.apache.paimon.hive.mapred.TableStoreOutputFormat;
+import org.apache.paimon.hive.mapred.PaimonInputFormat;
+import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import java.util.Map;
import java.util.Properties;
@@ -45,12 +45,12 @@ public class PaimonStorageHandler implements HiveStoragePredicateHandler, HiveSt
@Override
public Class<? extends InputFormat> getInputFormatClass() {
- return TableStoreInputFormat.class;
+ return PaimonInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- return TableStoreOutputFormat.class;
+ return PaimonOutputFormat.class;
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
similarity index 92%
rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java
rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 1712edfd4..50d2a8452 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -47,7 +47,7 @@ import java.util.Optional;
* {@link InputFormat} for paimon. It divides all files into {@link InputSplit}s (one split per
* bucket) and creates {@link RecordReader} for each split.
*/
-public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer> {
+public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> {
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
@@ -55,18 +55,18 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
DataTableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
- .map(split -> new TableStoreInputSplit(table.location().toString(), split))
- .toArray(TableStoreInputSplit[]::new);
+ .map(split -> new PaimonInputSplit(table.location().toString(), split))
+ .toArray(PaimonInputSplit[]::new);
}
@Override
public RecordReader<Void, RowDataContainer> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
- TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
+ PaimonInputSplit split = (PaimonInputSplit) inputSplit;
ReadBuilder readBuilder = table.newReadBuilder();
createPredicate(table.schema(), jobConf).ifPresent(readBuilder::withFilter);
- return new TableStoreRecordReader(
+ return new PaimonRecordReader(
readBuilder,
split,
table.schema().fieldNames(),
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
similarity index 93%
rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java
rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
index a7fe4e6ef..99871d1e6 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java
@@ -33,7 +33,7 @@ import java.util.Objects;
/**
* {@link FileSplit} for paimon. It contains all files to read from a certain partition and bucket.
*/
-public class TableStoreInputSplit extends FileSplit {
+public class PaimonInputSplit extends FileSplit {
private static final String[] ANYWHERE = new String[] {"*"};
@@ -41,9 +41,9 @@ public class TableStoreInputSplit extends FileSplit {
private DataSplit split;
// public no-argument constructor for deserialization
- public TableStoreInputSplit() {}
+ public PaimonInputSplit() {}
- public TableStoreInputSplit(String path, DataSplit split) {
+ public PaimonInputSplit(String path, DataSplit split) {
this.path = path;
this.split = split;
}
@@ -103,7 +103,7 @@ public class TableStoreInputSplit extends FileSplit {
if (o == null || getClass() != o.getClass()) {
return false;
}
- TableStoreInputSplit that = (TableStoreInputSplit) o;
+ PaimonInputSplit that = (PaimonInputSplit) o;
return Objects.equals(path, that.path) && Objects.equals(split, that.split);
}
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
similarity index 94%
rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java
rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index b1119993d..5e33bc5df 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -28,7 +28,7 @@ import org.apache.paimon.data.InternalRow;
import java.io.IOException;
/** {@link OutputFormat} for table split. Currently useless. */
-public class TableStoreOutputFormat implements OutputFormat<InternalRow, InternalRow> {
+public class PaimonOutputFormat implements OutputFormat<InternalRow, InternalRow> {
@Override
public RecordWriter<InternalRow, InternalRow> getRecordWriter(
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
similarity index 95%
rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java
rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
index 5485b620a..812f946d2 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java
@@ -39,7 +39,7 @@ import java.util.List;
* columnNames} this reader will still produce records of the original schema. However, columns not
* in {@code selectedColumns} will be null.
*/
-public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> {
+public class PaimonRecordReader implements RecordReader<Void, RowDataContainer> {
private final RecordReaderIterator<InternalRow> iterator;
private final long splitLength;
@@ -48,9 +48,9 @@ public class TableStoreRecordReader implements RecordReader<Void, RowDataContain
private float progress;
- public TableStoreRecordReader(
+ public PaimonRecordReader(
ReadBuilder readBuilder,
- TableStoreInputSplit split,
+ PaimonInputSplit split,
List<String> columnNames,
List<String> selectedColumns)
throws IOException {
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java
index e9dc491ad..6d61daf80 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java
@@ -35,8 +35,8 @@ import java.util.stream.Collectors;
/** {@link StructObjectInspector} for {@link InternalRow}. */
public class PaimonRowDataObjectInspector extends StructObjectInspector {
- private final List<TableStoreStructField> structFields;
- private final Map<String, TableStoreStructField> structFieldMap;
+ private final List<PaimonStructField> structFields;
+ private final Map<String, PaimonStructField> structFieldMap;
private final String typeName;
public PaimonRowDataObjectInspector(
@@ -48,8 +48,8 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector {
for (int i = 0; i < fieldNames.size(); i++) {
String name = fieldNames.get(i);
DataType logicalType = fieldTypes.get(i);
- TableStoreStructField structField =
- new TableStoreStructField(
+ PaimonStructField structField =
+ new PaimonStructField(
name,
PaimonObjectInspectorFactory.create(logicalType),
i,
@@ -84,7 +84,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector {
@Override
public Object getStructFieldData(Object o, StructField structField) {
InternalRow rowData = (InternalRow) o;
- return ((TableStoreStructField) structField).fieldGetter.getFieldOrNull(rowData);
+ return ((PaimonStructField) structField).fieldGetter.getFieldOrNull(rowData);
}
@Override
@@ -105,7 +105,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector {
return Category.STRUCT;
}
- private static class TableStoreStructField implements StructField {
+ private static class PaimonStructField implements StructField {
private final String name;
private final ObjectInspector objectInspector;
@@ -113,7 +113,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector {
private final InternalRow.FieldGetter fieldGetter;
private final String comment;
- private TableStoreStructField(
+ private PaimonStructField(
String name,
ObjectInspector objectInspector,
int idx,
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 0278d0736..1430d5555 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -61,10 +62,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-/**
- * IT cases for {@link PaimonStorageHandler} and {@link
- * org.apache.paimon.hive.mapred.TableStoreInputFormat}.
- */
+/** IT cases for {@link PaimonStorageHandler} and {@link PaimonInputFormat}. */
@RunWith(FlinkEmbeddedHiveRunner.class)
public class PaimonStorageHandlerITCase {
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
similarity index 92%
rename from paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java
rename to paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index e9a79b274..2c8574edb 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -35,8 +35,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link TableStoreInputSplit}. */
-public class TableStoreInputSplitTest {
+/** Tests for {@link PaimonInputSplit}. */
+public class PaimonInputSplitTest {
@TempDir java.nio.file.Path tempDir;
@@ -50,8 +50,8 @@ public class TableStoreInputSplitTest {
}
BinaryRow wantedPartition = generated.get(0).partition;
- TableStoreInputSplit split =
- new TableStoreInputSplit(
+ PaimonInputSplit split =
+ new PaimonInputSplit(
tempDir.toString(),
new DataSplit(
ThreadLocalRandom.current().nextLong(100),
@@ -70,7 +70,7 @@ public class TableStoreInputSplitTest {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream input = new DataInputStream(bais);
- TableStoreInputSplit actual = new TableStoreInputSplit();
+ PaimonInputSplit actual = new PaimonInputSplit();
actual.readFields(input);
assertThat(actual).isEqualTo(split);
}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
similarity index 92%
rename from paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java
rename to paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index 2053d670e..b23796647 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -49,8 +49,8 @@ import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link TableStoreRecordReader}. */
-public class TableStoreRecordReaderTest {
+/** Tests for {@link PaimonRecordReader}. */
+public class PaimonRecordReaderTest {
@TempDir java.nio.file.Path tempDir;
private String commitUser;
@@ -83,7 +83,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRow.ofKind(RowKind.DELETE, 2L, BinaryString.fromString("Hello")));
commit.commit(0, write.prepareCommit(true, 0));
- TableStoreRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0);
+ PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
Set<String> actual = new HashSet<>();
while (reader.next(null, container)) {
@@ -122,7 +122,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRow.of(1, BinaryString.fromString("Hi")));
commit.commit(0, write.prepareCommit(true, 0));
- TableStoreRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0);
+ PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
Map<String, Integer> actual = new HashMap<>();
while (reader.next(null, container)) {
@@ -160,8 +160,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
commit.commit(0, write.prepareCommit(true, 0));
- TableStoreRecordReader reader =
- read(table, BinaryRow.EMPTY_ROW, 0, Arrays.asList("c", "a"));
+ PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0, Arrays.asList("c", "a"));
RowDataContainer container = reader.createValue();
Map<String, Integer> actual = new HashMap<>();
while (reader.next(null, container)) {
@@ -176,19 +175,19 @@ public class TableStoreRecordReaderTest {
assertThat(actual).isEqualTo(expected);
}
- private TableStoreRecordReader read(FileStoreTable table, BinaryRow partition, int bucket)
+ private PaimonRecordReader read(FileStoreTable table, BinaryRow partition, int bucket)
throws Exception {
return read(table, partition, bucket, table.schema().fieldNames());
}
- private TableStoreRecordReader read(
+ private PaimonRecordReader read(
FileStoreTable table, BinaryRow partition, int bucket, List<String> selectedColumns)
throws Exception {
for (DataSplit split : table.newScan().plan().splits) {
if (split.partition().equals(partition) && split.bucket() == bucket) {
- return new TableStoreRecordReader(
+ return new PaimonRecordReader(
table.newReadBuilder(),
- new TableStoreInputSplit(tempDir.toString(), split),
+ new PaimonInputSplit(tempDir.toString(), split),
table.schema().fieldNames(),
selectedColumns);
}