You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jane Chan (Jira)" <ji...@apache.org> on 2022/05/16 15:07:00 UTC
[jira] [Updated] (FLINK-27652) CompactManager.Rewriter cannot handle different partition keys invoked compaction
[ https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jane Chan updated FLINK-27652:
------------------------------
Description:
h3. Issue Description
When enabling {{commit.force-compact}} for the partitioned managed table, there had a chance that the successive synchronized
writes got failure. The current impl of {{CompactManager.Rewriter}} is an anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and {{bucket}} are referenced as local variables; and this may lead to the {{rewrite}} method messing up with the wrong data file with the {{partition}} and {{bucket}}.
h3. Root Cause
{code:java}
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.FileNotFoundException: File file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not exist or the user running Flink ('jane.cjm') has insufficient permissions to access it. at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
{code}
However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to partition Autumn. It seems like the rewriter found the wrong partition/bucket with the wrong file.
h3. How to Reproduce
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.store.connector;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
/** A reproducible case. */
public class ForceCompactionITCase extends FileStoreTableITCase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS T1 ("
+ "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
}
@Test
public void test() throws ExecutionException, InterruptedException {
bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
+ ",(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
+ "(7, 'Summer', 'Summertime Sadness')")
.await();
bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+ "(4, 'Spring', 'Spring Water')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
}
}
{code}
was:
h3. Issue Description
When enabling {{commit.force-compact}} for the partitioned managed table, there had a chance that the successive synchronized
writes got failure. The current impl of {{CompactManager.Rewriter}} is an anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and {{bucket}} are referenced as local variables; and this may lead to the {{rewrite}} method messing up with the wrong data file with the {{partition}} and {{bucket}.
h3. Root Cause
{code:java}
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.FileNotFoundException: File file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not exist or the user running Flink ('jane.cjm') has insufficient permissions to access it. at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
{code}
However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to partition Autumn. It seems like the rewriter found the wrong partition/bucket with the wrong file.
h3. How to Reproduce
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.store.connector;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
/** A reproducible case. */
public class ForceCompactionITCase extends FileStoreTableITCase {
@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS T1 ("
+ "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
}
@Test
public void test() throws ExecutionException, InterruptedException {
bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
+ ",(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
+ "(7, 'Summer', 'Summertime Sadness')")
.await();
bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+ "(4, 'Spring', 'Spring Water')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September Ends')")
.await();
}
}
{code}
> CompactManager.Rewriter cannot handle different partition keys invoked compaction
> ---------------------------------------------------------------------------------
>
> Key: FLINK-27652
> URL: https://issues.apache.org/jira/browse/FLINK-27652
> Project: Flink
> Issue Type: Bug
> Components: Table Store
> Affects Versions: table-store-0.2.0
> Reporter: Jane Chan
> Priority: Major
> Fix For: table-store-0.2.0
>
>
> h3. Issue Description
> When enabling {{commit.force-compact}} for the partitioned managed table, there had a chance that the successive synchronized
> writes got failure. The current impl of {{CompactManager.Rewriter}} is an anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and {{bucket}} are referenced as local variables; and this may lead to the {{rewrite}} method messing up with the wrong data file with the {{partition}} and {{bucket}}.
> h3. Root Cause
> {code:java}
> Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.FileNotFoundException: File file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not exist or the user running Flink ('jane.cjm') has insufficient permissions to access it. at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
> {code}
> However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to partition Autumn. It seems like the rewriter found the wrong partition/bucket with the wrong file.
> h3. How to Reproduce
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.flink.table.store.connector;
> import org.junit.Test;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.ExecutionException;
> /** A reproducible case. */
> public class ForceCompactionITCase extends FileStoreTableITCase {
> @Override
> protected List<String> ddl() {
> return Collections.singletonList(
> "CREATE TABLE IF NOT EXISTS T1 ("
> + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)");
> }
> @Test
> public void test() throws ExecutionException, InterruptedException {
> bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
> bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')"
> + ",(2, 'Winter', 'The First Snowflake'), "
> + "(2, 'Spring', 'The First Rose in Spring'), "
> + "(7, 'Summer', 'Summertime Sadness')")
> .await();
> bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
> + "(4, 'Spring', 'Spring Water')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September Ends')")
> .await();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)