You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/08 07:58:16 UTC
[flink-table-store] branch master updated: [hotfix] Enable ignored tests
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 61cc7364 [hotfix] Enable ignored tests
61cc7364 is described below
commit 61cc736490d0430ed1e840231bfd1bae9f0e76d6
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Jul 8 15:58:02 2022 +0800
[hotfix] Enable ignored tests
---
.../CompositePkAndMultiPartitionedTableITCase.java | 2 -
.../store/connector/StreamingWarehouseITCase.java | 4 +-
.../store/connector/sink/LogStoreSinkITCase.java | 168 ---------------------
3 files changed, 1 insertion(+), 173 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
index 8ae20b09..a0a6b3f6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -881,7 +880,6 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
changelogRow("+I", "US Dollar", "Yen")));
}
- @Ignore
@Test
public void testEnableLogAndStreamingReadWriteMultiPartitionedRecordsWithoutPk()
throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index 15424e02..faa8eef7 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
-import org.junit.Ignore;
import org.junit.Test;
import java.time.LocalDateTime;
@@ -41,7 +40,6 @@ public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
private final StreamTableEnvironment batchTableEnv =
StreamTableEnvironment.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
- @Ignore // unstable case
@Test
public void testUserStory() throws Exception {
// Step1: define trade order table schema
@@ -88,7 +86,7 @@ public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
+ " )\n"
+ "PARTITIONED BY (dt)\n"
+ "WITH (\n"
- + " 'path' = '%s',\n"
+ + " 'root-path' = '%s',\n"
+ " 'log.system' = 'kafka', "
+ " 'kafka.bootstrap.servers' = '%s');",
rootPath, getBootstrapServers());
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
deleted file mode 100644
index 1f4cb4e5..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
-import org.apache.flink.table.store.CoreOptions.LogConsistency;
-import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
-import org.apache.flink.table.store.file.utils.BlockingIterator;
-import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
-import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
-import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
-import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
-import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.types.Row;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.apache.flink.table.store.connector.FileStoreITCase.CONVERTER;
-import static org.apache.flink.table.store.connector.FileStoreITCase.IDENTIFIER;
-import static org.apache.flink.table.store.connector.FileStoreITCase.SOURCE_DATA;
-import static org.apache.flink.table.store.connector.FileStoreITCase.TABLE_TYPE;
-import static org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
-import static org.apache.flink.table.store.connector.FileStoreITCase.buildFileStoreTable;
-import static org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
-import static org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
-import static org.apache.flink.table.store.connector.FileStoreITCase.executeAndCollect;
-import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for table store with kafka. */
-@Ignore // TODO enable this
-public class LogStoreSinkITCase extends KafkaTableTestBase {
-
- @Test
- public void testStreamingPartitioned() throws Exception {
- innerTest("testStreamingPartitioned", false, true, true, true);
- }
-
- @Test
- public void testStreamingNonPartitioned() throws Exception {
- innerTest("testStreamingNonPartitioned", false, false, true, true);
- }
-
- @Test
- public void testBatchPartitioned() throws Exception {
- innerTest("testBatchPartitioned", true, true, true, true);
- }
-
- @Test
- public void testStreamingEventual() throws Exception {
- innerTest("testStreamingEventual", false, true, false, true);
- }
-
- @Test
- public void testStreamingPartitionedNonKey() throws Exception {
- innerTest("testStreamingPartitionedNonKey", false, true, true, false);
- }
-
- @Test
- public void testBatchPartitionedNonKey() throws Exception {
- innerTest("testBatchPartitionedNonKey", true, true, true, false);
- }
-
- private void innerTest(
- String name, boolean isBatch, boolean partitioned, boolean transaction, boolean hasPk)
- throws Exception {
- StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : buildStreamEnv();
-
- // in eventual mode, failure will result in duplicate data
- FileStoreTable table =
- buildFileStoreTable(
- isBatch || !transaction,
- TEMPORARY_FOLDER,
- partitioned ? new int[] {1} : new int[0],
- hasPk ? new int[] {2} : new int[0]);
-
- // prepare log
- DynamicTableFactory.Context context =
- KafkaLogTestUtils.testContext(
- name,
- getBootstrapServers(),
- LogChangelogMode.AUTO,
- transaction ? LogConsistency.TRANSACTIONAL : LogConsistency.EVENTUAL,
- TABLE_TYPE,
- hasPk ? new int[] {2} : new int[0]);
-
- KafkaLogStoreFactory factory = discoverKafkaLogFactory();
- KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, null);
- KafkaLogSourceProvider sourceProvider = factory.createSourceProvider(context, null, null);
-
- factory.onCreateTable(context, 3, true);
-
- try {
- // write
- new FlinkSinkBuilder(IDENTIFIER, table)
- .withInput(buildTestSource(env, isBatch))
- .withLogSinkFunction(sinkProvider.createSink())
- .build();
- env.execute();
-
- // read
- List<Row> results =
- executeAndCollect(
- new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build());
-
- Row[] expected;
- if (hasPk) {
- expected =
- partitioned
- ? new Row[] {
- Row.of(5, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p1", 1),
- Row.of(0, "p1", 2)
- }
- : new Row[] {
- Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)
- };
- } else {
- Stream<RowData> expectedStream =
- isBatch
- ? SOURCE_DATA.stream()
- : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
- expected = expectedStream.map(CONVERTER::toExternal).toArray(Row[]::new);
- }
-
- assertThat(results).containsExactlyInAnyOrder(expected);
-
- BlockingIterator<RowData, Row> iterator =
- BlockingIterator.of(
- new FlinkSourceBuilder(IDENTIFIER, table)
- .withContinuousMode(true)
- .withLogSourceProvider(sourceProvider)
- .withEnv(buildStreamEnv())
- .build()
- .executeAndCollect(),
- CONVERTER::toExternal);
- results = iterator.collectAndClose(expected.length);
- assertThat(results).containsExactlyInAnyOrder(expected);
- } finally {
- factory.onDropTable(context, true);
- }
- }
-}