You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/02 07:45:03 UTC

[GitHub] [iceberg] openinx opened a new pull request #2410: Flink: Support SQL primary key

openinx opened a new pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410


   This is built on top of https://github.com/apache/iceberg/pull/2354 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r632141782



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.FlinkTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public class ChangeLogTableTestBase extends FlinkTestBase {
+  private volatile TableEnvironment tEnv = null;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", name.getMethodName());
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inStreamingMode()
+              .build();
+
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .enableCheckpointing(400)
+              .setMaxParallelism(1)
+              .setParallelism(1);
+
+          tEnv = StreamTableEnvironment.create(env, settings);
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  protected static Row insertRow(Object... values) {
+    return Row.ofKind(RowKind.INSERT, values);
+  }
+
+  protected static Row deleteRow(Object... values) {
+    return Row.ofKind(RowKind.DELETE, values);
+  }
+
+  protected static Row updateBeforeRow(Object... values) {
+    return Row.ofKind(RowKind.UPDATE_BEFORE, values);
+  }
+
+  protected static Row updateAfterRow(Object... values) {
+    return Row.ofKind(RowKind.UPDATE_AFTER, values);
+  }
+
+  protected static <T> List<T> listJoin(List<List<T>> lists) {

Review comment:
       nit: is this a tiny bit simpler?
   ```
   return lists.stream()
           .flatMap(List::stream)
           .collect(Collectors.toList());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-835165046


   @dixingxing0 yeah I think this should definitely be a part of 0.12.0 milestone. It's late night at my timezone, I am planning to review tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631601680



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.FlinkTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public class ChangeLogTableTestBase extends FlinkTestBase {
+  private volatile TableEnvironment tEnv = null;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", name.getMethodName());
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inStreamingMode()
+              .build();
+
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .enableCheckpointing(400)
+              .setMaxParallelism(1)
+              .setParallelism(1);
+
+          tEnv = StreamTableEnvironment.create(env, settings);
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
+      "+I", RowKind.INSERT,

Review comment:
       Sounds good to me !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r632140150



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       so we want the pattern of checkpoint cycles like this? Only small downside is slightly longer runtime. Now we need to wait for 6 checkpoints instead of 3.
   ```
   insert batch 1, idle, insert batch 2, idle, insert batch 3, idle, ...
   ```
   
   Regardless, it is helpful to add some comments to explain the decision. It is very difficult to know the intention of the magic number of 2 otherwise.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-842750725


   @stevenzwu @jackye1995 , any other concerns ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-813867546


   @edgarRd You could see the diff [here](https://github.com/apache/iceberg/pull/2410/commits/c8a901cb44d791be1a57cca8f08b58fc179ca9a9). 
   
   @zhangwcoder   From my understanding since we community's last sync,  I think we will still not expose v2 to end users but I think there will be some great features that will be merged in next weeks such as rewrite delete files etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
edgarRd commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-812575674


   @openinx Would it be possible to set the base branch to `jackye1995:row-id-api` to see the diff that applies to your branch only? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631519578



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
##########
@@ -121,6 +126,40 @@ public void testCreateTable() throws TableNotExistException {
     Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
   }
 
+  @Test
+  public void testCreateTableWithPrimaryKey() throws Exception {

Review comment:
       We apache flink don't support altering primary key now, but we have [proposal](https://issues.apache.org/jira/browse/FLINK-21634?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) to extend the flink's alter table. Maybe we will support it in the following releases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631515294



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -61,7 +65,22 @@ public static Schema convert(TableSchema schema) {
     RowType root = (RowType) schemaType;
     Type converted = root.accept(new FlinkTypeToType(root));
 
-    return new Schema(converted.asStructType().fields());
+    Schema iSchema = new Schema(converted.asStructType().fields());
+    return freshIdentifierFieldIds(iSchema, schema);
+  }
+
+  private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {

Review comment:
       I don't think we can reuse the `TypeUtil.refreshIdentifierFields(Types.StructType freshSchema, Schema baseSchema)` because in this method we've had an existing identifier field id list inside the `baseSchema`.  While for this case,  the identifier fields are actually came from flink's `TableSchema`,  converting the flink's `TableSchema` with primary keys to the iceberg table `schema` with identifier field id list is exactly the thing we are trying to accomplish in this method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631512356



##########
File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
##########
@@ -209,9 +209,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) {
   }
 
   public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
+    table.refresh();

Review comment:
       This `table.refresh()` here is to get the latest snapshot id, so that the follow `actualRowSet` could read the records from the latest snapshot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r632136333



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
##########
@@ -270,4 +275,54 @@ private void checkInconsistentType(
         Types.StructType.of(Types.NestedField.optional(0, "f0", icebergExpectedType)),
         FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
   }
+
+  @Test
+  public void testConvertFlinkSchemaBaseOnIcebergSchema() {
+    Schema baseSchema = new Schema(
+        Lists.newArrayList(
+            Types.NestedField.required(101, "int", Types.IntegerType.get()),
+            Types.NestedField.optional(102, "string", Types.StringType.get())
+        ),
+        Sets.newHashSet(101, 102)
+    );
+
+    TableSchema flinkSchema = TableSchema.builder()
+        .field("int", DataTypes.INT().notNull())
+        .field("string", DataTypes.STRING().nullable())
+        .primaryKey("int")
+        .build();
+    Schema convertedSchema = FlinkSchemaUtil.convert(baseSchema, flinkSchema);
+    Assert.assertEquals(baseSchema.asStruct(), convertedSchema.asStruct());
+    Assert.assertEquals(ImmutableSet.of(101), convertedSchema.identifierFieldIds());
+  }
+
+  @Test
+  public void testConvertFlinkSchemaWithPrimaryKeys() {
+    Schema iSchema = new Schema(

Review comment:
       nit: iSchema -> icebergSchema? It took me a few seconds to guess what `i` stands for here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangwcoder commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
zhangwcoder commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-812468147


   @openinx Hi bro, I'm paying attention to this function and when will it be completed.  
   Is it will released in the version of  0.12.0  ?  This is my Issues  #2409 ,  I'm focus in syncing data from mysql to iceberg table in flink stream job .  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631601167



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       Let's say `checkpointToAwait = numCheckpointsComplete.get() + delta`,   in fact the value of `delta` should not affect the final table records because we only need to make sure that there will be exactly `elementsPerCheckpoint.size()`  checkpoints to emit each records buffer from the original `elementsPerCheckpoint`.   Even if the checkpoints that emitted results are not continuous, the correctness of the data should not be affected in the end.   Setting the `delta` to be 2 is introduce the variable that produce un-continuous checkpoints. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631589930



##########
File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
##########
@@ -209,9 +209,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) {
   }
 
   public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
+    table.refresh();

Review comment:
       Yeah, I like you idea ! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r632140150



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       so we want the pattern of checkpoint cycles like this? Only small downside is slightly longer runtime. Now we need to wait for 6 checkpoints instead of 3.
   ```
   insert batch 1, idle, insert batch 2, idle, insert batch 3, idle, ...
   ```
   
   Regardless, it is helpful to add some comments to explain the decision. It is very difficult to know the intention of the magic number of 2.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631601167



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       Let's say `checkpointToAwait = numCheckpointsComplete.get() + delta`,   in fact the value of `delta` should not affect the final table records because we only need to make sure that there will be exactly `elementsPerCheckpoint.size()`  checkpoints to emit each records buffer from the original `elementsPerCheckpoint`.   Even if the checkpoints that emitted results are not continuous, the correctness of the data should not be affected in the end.   Setting the `delta` to be 2 is introducing the variable that produce un-continuous checkpoints that emit the records buffer from `elementsPerCheckpoints`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-812463051


   If we want to sync deletions from mysql binlog into apache iceberg table, then we will need to upgrade the iceberg table from version 1 to version 2 by following code as we don't expose the v2 to end users now. 
   
   ```java
   import java.util.Map;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.BaseTable;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.TableMetadata;
   import org.apache.iceberg.TableOperations;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.relocated.com.google.common.collect.Maps;
   
   public class TestIceberg {
   
     private TestIceberg() {
     }
   
     public static void main(String[] args) {
   
       Map<String, String> properties = Maps.newHashMap();
       properties.put("type", "iceberg");
       properties.put("catalog-type", "hive");
       properties.put("uri", "thrift://localhost:9083");
       properties.put("clients", "5");
       properties.put("property-version", "1");
       properties.put("warehouse", "file:///Users/openinx/test/iceberg-warehouse");
   
       CatalogLoader loader = CatalogLoader.hive("hive_catalog", new Configuration(), properties);
       Table table = loader.loadCatalog().loadTable(TableIdentifier.of("mysql_db", "iceberg_sbtest1"));
   
       TableOperations ops = ((BaseTable) table).operations();
       TableMetadata meta = ops.current();
       ops.commit(meta, meta.upgradeToFormatVersion(2));
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r629007089



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {

Review comment:
       it seems there could be many combinations of changelog sequence. how do we know that we covered the meaningful combinations?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628997760



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/ChangeLogTableTestBase.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.FlinkTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public class ChangeLogTableTestBase extends FlinkTestBase {
+  private volatile TableEnvironment tEnv = null;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", name.getMethodName());
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inStreamingMode()
+              .build();
+
+          StreamExecutionEnvironment env = StreamExecutionEnvironment
+              .getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+              .enableCheckpointing(400)
+              .setMaxParallelism(1)
+              .setParallelism(1);
+
+          tEnv = StreamTableEnvironment.create(env, settings);
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  private static final Map<String, RowKind> ROW_KIND_MAP = ImmutableMap.of(
+      "+I", RowKind.INSERT,

Review comment:
       instead of having these strings mapping with a single method `row`, which results in a ton of static strings in tests, I think we can use method based approach and have 4 methods `insertRow`, `deleteRow`, `updateRowBefore`, `updateRowAfter` because there are only 4 cases and we don't really expect the case to grow.
   
   The methods can use signature like `insertRow(Object... values)` just like the `Row.ofKind` method to be more flexible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631576870



##########
File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
##########
@@ -209,9 +209,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) {
   }
 
   public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
+    table.refresh();

Review comment:
       got it. I just found it a little weird that we have to call `table.refresh` twice in this case.  I think your intention is to support `actualRowSet` for either latest snapshot or specific snapshotId. maybe define the overloaded method as below. Then this method can just pass in null for the `snapshotId`
   ```
   actualRowSet(Table table, @Nullable Long snapshotId, String... columns) 
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx edited a comment on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx edited a comment on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-812463051


   If we want to sync deletions from mysql binlog into apache iceberg table, then we will need to upgrade the iceberg table from version 1 to version 2 by following code as we don't expose the v2 to end users now. 
   
   ```java
   import java.util.Map;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.BaseTable;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.TableMetadata;
   import org.apache.iceberg.TableOperations;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.relocated.com.google.common.collect.Maps;
   
   public class TestIceberg {
   
     private TestIceberg() {
     }
   
     public static void main(String[] args) {
   
       Map<String, String> properties = Maps.newHashMap();
       properties.put("type", "iceberg");
       properties.put("catalog-type", "hive");
       properties.put("uri", "thrift://localhost:9083");
       properties.put("clients", "5");
       properties.put("property-version", "1");
       properties.put("warehouse", "file:///Users/openinx/test/iceberg-warehouse");
   
       CatalogLoader loader = CatalogLoader.hive("hive_catalog", new Configuration(), properties);
       Table table = loader.loadCatalog().loadTable(TableIdentifier.of("mysql_db", "iceberg_sbtest1"));
   
       TableOperations ops = ((BaseTable) table).operations();
       TableMetadata meta = ops.current();
       ops.commit(meta, meta.upgradeToFormatVersion(2));
     }
   }
   ```
   
   Then we could use the `sysbench` to update the records in mysql server: 
   
   ```
   sysbench --mysql-host=localhost \
       --mysql-user=<your-mysql-user> \
       --mysql-password=<your-mysql-password>  \
       --mysql-db=test \
       --threads=1 \
       --rate=52 \
       --time=1800 \
       --report-interval=10  \      
       oltp_update_index \
       --table_size=60000000 \
       --skip_trx=on \
       run
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631595977



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {

Review comment:
       Actually we cannot cover all the combinations in this unit test, but I think we could cover those critical cases in this class: 
   
   1.  The first dimension is the primary key columns (PK), we choose to cover the three cases: 
       a) `id` as PK ;
       b) `data` as PK;
       c) `id,data` as PK;
   The different primary key from different combinations will lead to different path to [generate the key of a row](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L122).  We need to ensure that the data correctness  is not affected by the key combination.  
   
   2. The second dimension is:  What's the correct results after applying INSERT, DELETE, UPDATE inside a same transaction.   The same key may have different version of INSERT, DELETE, UPDATE.  We need to ensure that the various insert/delete/update combinations within a given txn will not affect the correctness.
   
   3.  The third dimension is: What's the correct results after applying INSERT/DELETE/UPDATE among different transaction.  We need to ensure that the various insert/delete/update combinations between different txn will not affect the correctness.
   
   This's the reason that why do we choose to design the following test cases. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-835163660


   Is anyone reviewing this PR, it seems that once this PR is merged it will be the first available version for the CDC stream writing, which will be of great help to iceberg users who want to use the upsert feature, personally i think this PR should be merged before 0.12 release.  @rdblue @stevenzwu @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628987753



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -61,7 +65,22 @@ public static Schema convert(TableSchema schema) {
     RowType root = (RowType) schemaType;
     Type converted = root.accept(new FlinkTypeToType(root));
 
-    return new Schema(converted.asStructType().fields());
+    Schema iSchema = new Schema(converted.asStructType().fields());
+    return freshIdentifierFieldIds(iSchema, schema);
+  }
+
+  private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {
+    // Locate the identifier field id list.
+    Set<Integer> identifierFieldIds = Sets.newHashSet();
+    if (schema.getPrimaryKey().isPresent()) {
+      for (String column : schema.getPrimaryKey().get().getColumns()) {
+        Types.NestedField field = iSchema.findField(column);
+        Preconditions.checkNotNull(field, "Column %s does not found in schema %s", column, iSchema);

Review comment:
       nit: error message format should follow `Cannot ...`, such as `Cannot find field ID for primary key column %s in schema %s`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631518842



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {
+  private static final Configuration CONF = new Configuration();
+  private static final String SOURCE_TABLE = "default_catalog.default_database.source_change_logs";
+
+  private static final String CATALOG_NAME = "test_catalog";
+  private static final String DATABASE_NAME = "test_db";
+  private static final String TABLE_NAME = "test_table";
+  private static String warehouse;
+
+  private final boolean partitioned;
+
+  @Parameterized.Parameters(name = "PartitionedTable={0}")
+  public static Iterable<Object[]> parameters() {
+    return ImmutableList.of(
+        new Object[] {true},
+        new Object[] {false}
+    );
+  }
+
+  public TestChangeLogTable(boolean partitioned) {
+    this.partitioned = partitioned;
+  }
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    File warehouseFile = TEMPORARY_FOLDER.newFolder();
+    Assert.assertTrue("The warehouse should be deleted", warehouseFile.delete());
+    warehouse = String.format("file:%s", warehouseFile);
+  }
+
+  @Before
+  public void before() {
+    sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
+        CATALOG_NAME, warehouse);
+    sql("USE CATALOG %s", CATALOG_NAME);
+    sql("CREATE DATABASE %s", DATABASE_NAME);
+    sql("USE %s", DATABASE_NAME);
+  }
+
+  @After
+  @Override
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
+    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Test
+  public void testSqlChangeLogOnIdKey() throws Exception {
+    List<List<Row>> inputRowsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("-D", 2, "aaa"),
+            row("+I", 2, "bbb")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "bbb"),
+            row("+U", 2, "ccc"),
+            row("-D", 2, "ccc"),
+            row("+I", 2, "ddd")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 1, "ccc"),
+            row("-D", 1, "ccc"),
+            row("+I", 1, "ddd")
+        )
+    );
+
+    List<List<Record>> expectedRecordsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+        ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+        ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("id"), inputRowsPerCheckpoint,
+        expectedRecordsPerCheckpoint);
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("+I", 2, "ccc")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data", "id"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testPureInsertOnIdKey() throws Exception {

Review comment:
       Because I was trying to cover the case that only write inserts in the format v2 (without appending any deletes - pos-deletes or eq-deletes into the [posDeleteWriter](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L109) and [eqDeleteWriter](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L108)).  The process of writing pure inserts in v2 (with identifier field id list) is actually different with the process in v1 or the process without identifier field id list in v2 because we need to maintain the [insertedRowMap](https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L125) to deduplicate the the same keys in the same txn.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628996025



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {
+  private static final Configuration CONF = new Configuration();
+  private static final String SOURCE_TABLE = "default_catalog.default_database.source_change_logs";
+
+  private static final String CATALOG_NAME = "test_catalog";
+  private static final String DATABASE_NAME = "test_db";
+  private static final String TABLE_NAME = "test_table";
+  private static String warehouse;
+
+  private final boolean partitioned;
+
+  @Parameterized.Parameters(name = "PartitionedTable={0}")
+  public static Iterable<Object[]> parameters() {
+    return ImmutableList.of(
+        new Object[] {true},
+        new Object[] {false}
+    );
+  }
+
+  public TestChangeLogTable(boolean partitioned) {
+    this.partitioned = partitioned;
+  }
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    File warehouseFile = TEMPORARY_FOLDER.newFolder();
+    Assert.assertTrue("The warehouse should be deleted", warehouseFile.delete());
+    warehouse = String.format("file:%s", warehouseFile);
+  }
+
+  @Before
+  public void before() {
+    sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
+        CATALOG_NAME, warehouse);
+    sql("USE CATALOG %s", CATALOG_NAME);
+    sql("CREATE DATABASE %s", DATABASE_NAME);
+    sql("USE %s", DATABASE_NAME);
+  }
+
+  @After
+  @Override
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
+    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Test
+  public void testSqlChangeLogOnIdKey() throws Exception {
+    List<List<Row>> inputRowsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("-D", 2, "aaa"),
+            row("+I", 2, "bbb")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "bbb"),
+            row("+U", 2, "ccc"),
+            row("-D", 2, "ccc"),
+            row("+I", 2, "ddd")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 1, "ccc"),
+            row("-D", 1, "ccc"),
+            row("+I", 1, "ddd")
+        )
+    );
+
+    List<List<Record>> expectedRecordsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+        ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+        ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("id"), inputRowsPerCheckpoint,
+        expectedRecordsPerCheckpoint);
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("+I", 2, "ccc")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data", "id"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testPureInsertOnIdKey() throws Exception {

Review comment:
       why do we need an insert only test? The tests above should be able to convert this case. If we really want to test comprehensively, I think we should also have tests for insert + delete only and insert + update only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dixingxing0 commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
dixingxing0 commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-835182991


   > @dixingxing0 yeah I think this should definitely be a part of 0.12.0 milestone. It's late night at my timezone, I am planning to review tomorrow.
   
   Thanks  @jackye1995, glad to hear that!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r632134242



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {

Review comment:
       thx. might be helpful to add those in comments to give readers some context on the unit test design




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd edited a comment on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
edgarRd edited a comment on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-812575674


   @openinx Would it be possible to set the base branch to `jackye1995:row-id-api` for now to see the diff that applies to your branch only? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628996025



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.flink.source.ChangeLogTableTestBase;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestChangeLogTable extends ChangeLogTableTestBase {
+  private static final Configuration CONF = new Configuration();
+  private static final String SOURCE_TABLE = "default_catalog.default_database.source_change_logs";
+
+  private static final String CATALOG_NAME = "test_catalog";
+  private static final String DATABASE_NAME = "test_db";
+  private static final String TABLE_NAME = "test_table";
+  private static String warehouse;
+
+  private final boolean partitioned;
+
+  @Parameterized.Parameters(name = "PartitionedTable={0}")
+  public static Iterable<Object[]> parameters() {
+    return ImmutableList.of(
+        new Object[] {true},
+        new Object[] {false}
+    );
+  }
+
+  public TestChangeLogTable(boolean partitioned) {
+    this.partitioned = partitioned;
+  }
+
+  @BeforeClass
+  public static void createWarehouse() throws IOException {
+    File warehouseFile = TEMPORARY_FOLDER.newFolder();
+    Assert.assertTrue("The warehouse should be deleted", warehouseFile.delete());
+    warehouse = String.format("file:%s", warehouseFile);
+  }
+
+  @Before
+  public void before() {
+    sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
+        CATALOG_NAME, warehouse);
+    sql("USE CATALOG %s", CATALOG_NAME);
+    sql("CREATE DATABASE %s", DATABASE_NAME);
+    sql("USE %s", DATABASE_NAME);
+  }
+
+  @After
+  @Override
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
+    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
+    BoundedTableFactory.clearDataSets();
+  }
+
+  @Test
+  public void testSqlChangeLogOnIdKey() throws Exception {
+    List<List<Row>> inputRowsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("-D", 2, "aaa"),
+            row("+I", 2, "bbb")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "bbb"),
+            row("+U", 2, "ccc"),
+            row("-D", 2, "ccc"),
+            row("+I", 2, "ddd")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 1, "ccc"),
+            row("-D", 1, "ccc"),
+            row("+I", 1, "ddd")
+        )
+    );
+
+    List<List<Record>> expectedRecordsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+        ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+        ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("id"), inputRowsPerCheckpoint,
+        expectedRecordsPerCheckpoint);
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa"),
+            row("+I", 2, "ccc")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 1, "aaa"),
+            row("+I", 2, "bbb"),
+            row("+I", 1, "bbb"),
+            row("+I", 2, "aaa")
+        ),
+        ImmutableList.of(
+            row("-U", 2, "aaa"),
+            row("+U", 1, "ccc"),
+            row("+I", 1, "aaa")
+        ),
+        ImmutableList.of(
+            row("-D", 1, "bbb"),
+            row("+I", 2, "aaa")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb"))
+    );
+
+    testSqlChangeLog(TABLE_NAME, ImmutableList.of("data", "id"), elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testPureInsertOnIdKey() throws Exception {

Review comment:
       why do we need an insert only test? The tests above should be able to convert this case. If we really want to test comprehensively, I think we should also have tests for insert + delete only and insert + update (before only, after only, before + after) only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628982669



##########
File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
##########
@@ -209,9 +209,18 @@ public static StructLikeSet expectedRowSet(Table table, Record... records) {
   }
 
   public static StructLikeSet actualRowSet(Table table, String... columns) throws IOException {
+    table.refresh();

Review comment:
       the method below already calls `table.refresh()`. seems redundant here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r629010553



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       I have a question for the `BoundedTestSource` line 66. Why is it `+2`?
   
   ```
   checkpointToAwait = numCheckpointsComplete.get() + 2;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#issuecomment-843037824


   We got one +1 from apache flink expert (@stevenzwu ) and  +1 from iceberg community (@jackye1995 ) , and there's no other available iceberg committers with good flink+iceberg background now,  so I've got this PR merged in case of blocking this feature too long.   If anyone has other concern,  we could discuss in the next PR. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628989589



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
##########
@@ -61,7 +65,22 @@ public static Schema convert(TableSchema schema) {
     RowType root = (RowType) schemaType;
     Type converted = root.accept(new FlinkTypeToType(root));
 
-    return new Schema(converted.asStructType().fields());
+    Schema iSchema = new Schema(converted.asStructType().fields());
+    return freshIdentifierFieldIds(iSchema, schema);
+  }
+
+  private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {

Review comment:
       instead of having this method, if we expose a `TypeUtil.freshIdentifierFieldIds(Schema iSchema, Schema base)`, can we do a conversion of Flink to Iceberg schema and then call that method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r631601167



##########
File path: flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class BoundedTableFactory implements DynamicTableSourceFactory {
+  private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
+  private static final Map<String, List<List<Row>>> DATA_SETS = new HashMap<>();
+
+  private static final ConfigOption<String> DATA_ID = ConfigOptions.key("data-id").stringType().noDefaultValue();
+
+  public static String registerDataSet(List<List<Row>> dataSet) {
+    String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
+    DATA_SETS.put(dataSetId, dataSet);
+    return dataSetId;
+  }
+
+  public static void clearDataSets() {
+    DATA_SETS.clear();
+  }
+
+  @Override
+  public DynamicTableSource createDynamicTableSource(Context context) {
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+
+    Configuration configuration = Configuration.fromMap(context.getCatalogTable().getOptions());
+    String dataId = configuration.getString(DATA_ID);
+    Preconditions.checkArgument(DATA_SETS.containsKey(dataId),
+        "data-id %s does not found in registered data set.", dataId);
+
+    return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return "BoundedSource";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return ImmutableSet.of(DATA_ID);
+  }
+
+  private static class BoundedTableSource implements ScanTableSource {
+
+    private final List<List<Row>> elementsPerCheckpoint;
+    private final TableSchema tableSchema;
+
+    private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
+      this.elementsPerCheckpoint = elementsPerCheckpoint;
+      this.tableSchema = tableSchema;
+    }
+
+    private BoundedTableSource(BoundedTableSource toCopy) {
+      this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
+      this.tableSchema = toCopy.tableSchema;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+      return ChangelogMode.newBuilder()
+          .addContainedKind(RowKind.INSERT)
+          .addContainedKind(RowKind.DELETE)
+          .addContainedKind(RowKind.UPDATE_BEFORE)
+          .addContainedKind(RowKind.UPDATE_AFTER)
+          .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+      return new DataStreamScanProvider() {
+        @Override
+        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+          SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint);

Review comment:
       Let's say `checkpointToAwait = numCheckpointsComplete.get() + delta`,   in fact the value of `delta` should not affect the final table records because we only need to make sure that there will be exactly `elementsPerCheckpoint.size()`  checkpoints to emit each records buffer from the original `elementsPerCheckpoint`.   Even if the checkpoints that emitted results are not continuous, the correctness of the data should not be affected in the end.   Setting the `delta` to be 2 is introducing the variable that produce un-continuous checkpoints. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2410: Flink: Support SQL primary key

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2410:
URL: https://github.com/apache/iceberg/pull/2410#discussion_r628996798



##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
##########
@@ -121,6 +126,40 @@ public void testCreateTable() throws TableNotExistException {
     Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
   }
 
+  @Test
+  public void testCreateTableWithPrimaryKey() throws Exception {

Review comment:
       is there an update primary key syntax in flink?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org