You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/17 12:05:50 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #6610: Flink: Port Support inspecting metadata table to Flink 1.14 & 1.15

hililiwei opened a new pull request, #6610:
URL: https://github.com/apache/iceberg/pull/6610

   ```
   ➜  iceberg git:(inspecting2) ✗ git diff --no-index  flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/ flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/
    import org.apache.flink.table.api.constraints.UniqueConstraint;
    import org.apache.flink.table.connector.ChangelogMode;
   +import org.apache.flink.table.connector.ProviderContext;
    import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
    import org.apache.flink.table.connector.sink.DynamicTableSink;
    import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
    import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
   +import org.apache.flink.table.data.RowData;
    import org.apache.flink.types.RowKind;
    import org.apache.flink.util.Preconditions;
    import org.apache.iceberg.flink.sink.FlinkSink;
   @@ -69,16 +73,20 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
        List<String> equalityColumns =
            tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of);
   
   -    return (DataStreamSinkProvider)
   -        (providerContext, dataStream) ->
   -            FlinkSink.forRowData(dataStream)
   -                .tableLoader(tableLoader)
   -                .tableSchema(tableSchema)
   -                .equalityFieldColumns(equalityColumns)
   -                .overwrite(overwrite)
   -                .setAll(writeProps)
   -                .flinkConf(readableConfig)
   -                .append();
   +    return new DataStreamSinkProvider() {
   +      @Override
   +      public DataStreamSink<?> consumeDataStream(
   +          ProviderContext providerContext, DataStream<RowData> dataStream) {
   +        return FlinkSink.forRowData(dataStream)
   +            .tableLoader(tableLoader)
   +            .tableSchema(tableSchema)
   +            .equalityFieldColumns(equalityColumns)
   +            .overwrite(overwrite)
   +            .setAll(writeProps)
   +            .flinkConf(readableConfig)
   +            .append();
   +      }
   +    };
      }
   
      @Override
   diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   index d84e2cb70..b686a76c9 100644
   --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
   @@ -285,8 +285,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
            commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
          }
          continuousEmptyCheckpoints = 0;
   -    } else {
   -      LOG.info("Skipping committing empty checkpoint {}", checkpointId);
        }
      }
   ```
   
   The second difference is because https://github.com/apache/iceberg/pull/6452 is not ported to the older versions. @pvary 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6610: Flink: Port Support inspecting metadata table to Flink 1.14 & 1.15

Posted by GitBox <gi...@apache.org>.
stevenzwu merged PR #6610:
URL: https://github.com/apache/iceberg/pull/6610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6610: Flink: Port Support inspecting metadata table to Flink 1.14 & 1.15

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6610:
URL: https://github.com/apache/iceberg/pull/6610#issuecomment-1385727800

   thanks @hililiwei for the backport of PR #6222 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org