You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "geonyeongkim (via GitHub)" <gi...@apache.org> on 2023/05/09 13:11:59 UTC

[GitHub] [iceberg] geonyeongkim opened a new issue, #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

geonyeongkim opened a new issue, #7568:
URL: https://github.com/apache/iceberg/issues/7568

   ### Query engine
   
   Flink
   
   ### Question
   
   Hello.
   
   Attempt to load from cdc to iceberg table using flink DataStream.
   
   By the way, **there are too many small files in kb units**, is there a way to solve this problem?
   
   **For example, collect as much kafka data as possible and write it.**
   
   Or is there a way to use **Flink DataStream and Flink Rewrite DataFiles Action together?**
   
   **For example, write 10 times and then RewriteDataFiles.**
   
   
   ```kotlin
   object CpdepTacctLCowBmtApp {
   
       private const val topic = "pedw.cdc.RDWOWN.CPDEP_TACCT_L"
       private val bootstrapServers = KafkaConfigUtil.getBootstrapServers(KafkaClusterType.DP1)
   
       @JvmStatic
       fun main(args: Array<String>) {
   
           val env = StreamExecutionEnvironment.getExecutionEnvironment()
           env.enableCheckpointing(5000)
   
           val kafkaSource = KafkaSource.builder<CustomKafkaRecord>()
               .setBootstrapServers(bootstrapServers)
               .setTopics(topic)
               .setGroupId(CpdepTacctLCowBmtApp::class.java.name)
               .setClientIdPrefix(UUID.randomUUID().toString())
               .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
               .setDeserializer(CustomKafkaRecordDeserializationSchema())
               .build()
   
           val tableLoader = TableLoader.fromHadoopTable("hdfs:///user/geonyeong.kim/iceberg/cpdep_tacct_l/cow")
   
           val input: DataStream<RowData> =
               env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "iceberg-cow-bmt-source")
                   .filter { it.value != null }
                   .map { DpObjectMapper.readValue(it.value!!, CpdepTacctLCdcVO::class.java) }
                   .map { it.payload }
                   .map {
                       val row = GenericRowData(109)
                       if (it.op.equals("d")) {
                           row.rowKind = RowKind.DELETE
                       } else {
                           val vo = it.after!!
                           row.setField(0, StringData.fromString(vo.ACNO))
                           row.setField(1, StringData.fromString(vo.TX_DT))
                           row.setField(2, StringData.fromString("${vo.TX_SEQNO}"))
                           row.setField(3, StringData.fromString("${vo.DTL_TX_SEQNO}"))
                           row.setField(4, StringData.fromString("${vo.FRMW_CHNG_TMST}"))
                           row.setField(5, StringData.fromString(vo.TX_GUID))
                           row.setField(6, StringData.fromString(vo.SLIP_NO))
                           row.setField(7, StringData.fromString(vo.DEP_TX_STCD))
                           row.setField(8, StringData.fromString(vo.DEP_CANC_DVCD))
                           row.setField(9, StringData.fromString(vo.TX_TIME))
                           row.setField(10, StringData.fromString(vo.RECKN_DT))
                           row.setField(11, StringData.fromString(vo.SYS_DT))
                           row.setField(12, StringData.fromString(vo.SYS_TIME))
                           row.setField(13, StringData.fromString(vo.DEP_TRRC_KNCD))
                           row.setField(14, StringData.fromString(vo.TX_CHNL_DVCD))
                           row.setField(15, StringData.fromString(vo.TX_FRMN_CD))
                           row.setField(16, StringData.fromString(vo.TX_CD))
                           row.setField(17, StringData.fromString(vo.TX_BRCD))
                           row.setField(18, StringData.fromString(vo.TX_TLRNO))
                           row.setField(19, StringData.fromString(vo.ACCO_MGMT_BRCD))
                           row.setField(20, StringData.fromString(vo.XN_PB_DVCD))
                           row.setField(21, StringData.fromString(vo.CURN_CD))
                           row.setField(22, StringData.fromString(vo.ACCO_SBCD))
                           row.setField(23, StringData.fromString(vo.GDS_CD))
                           row.setField(24, StringData.fromString(vo.AFTR_BAL_CHNG_TX_YN))
                           row.setField(25, StringData.fromString(vo.TX_AMT))
                           row.setField(26, StringData.fromString(vo.AFTR_BAL))
                           row.setField(27, StringData.fromString(vo.CASH_AMT))
                           row.setField(28, StringData.fromString(vo.SMPL_TRFAMT))
                           row.setField(29, StringData.fromString(vo.TOT_CBPC_AMT))
                           row.setField(30, StringData.fromString(vo.ITLK_ALTR_AMT))
                           row.setField(31, StringData.fromString(vo.CSCHQ_ISSU_AMT))
                           row.setField(32, StringData.fromString(vo.CSCHQ_PRVS_AMT))
                           row.setField(33, StringData.fromString(vo.OJBB_MNRC_AMT))
                           row.setField(34, StringData.fromString(vo.OBNK_MNRC_AMT))
                           row.setField(35, StringData.fromString(vo.DEP_TRRC_CBPC_CD))
                           row.setField(36, StringData.fromString(vo.DRAF_CHQ_NO))
                           row.setField(37, StringData.fromString(vo.CBPC_AMT))
                           row.setField(38, StringData.fromString(vo.FEE_PRFR_RSCD))
                           row.setField(39, StringData.fromString(vo.DEP_FEE_RCV_DVCD))
                           row.setField(40, StringData.fromString("${vo.FEE_TX_SEQNO}"))
                           row.setField(41, StringData.fromString(vo.INSTL_MM))
                           row.setField(42, StringData.fromString("${vo.INSTL_NTH}"))
                           row.setField(43, StringData.fromString("${vo.OVRD_PPAY_DAYS}"))
                           row.setField(44, StringData.fromString(vo.CNGT_MSG_CD))
                           row.setField(45, StringData.fromString(vo.BBPR_CTNT))
                           row.setField(46, StringData.fromString(vo.PB_ARR_TRGT_YN))
                           row.setField(47, StringData.fromString(vo.PSWD_NSTUP_PRVS_YN))
                           row.setField(48, StringData.fromString(vo.NPSWD_RSCD))
                           row.setField(49, StringData.fromString(vo.ACCO_UZ_DVCD))
                           row.setField(50, StringData.fromString(vo.INPUT_ACNO))
                           row.setField(51, StringData.fromString(vo.BTCH_NO))
                           row.setField(52, StringData.fromString(vo.AFCL_TX_YN))
                           row.setField(53, StringData.fromString(vo.PRCG_APRV_TLRNO))
                           row.setField(54, StringData.fromString(vo.ITLK_TX_DVCD))
                           row.setField(55, StringData.fromString(vo.DEP_TRRC_TYCD))
                           row.setField(56, StringData.fromString(vo.RBF_SRVC_ID))
                           row.setField(57, StringData.fromString(vo.REAL_ACCP_AMT))
                           row.setField(58, StringData.fromString(vo.INT_CCAM))
                           row.setField(59, StringData.fromString(vo.UNPAID_BAL))
                           row.setField(60, StringData.fromString(vo.FRUP_BAL_TX_DT))
                           row.setField(61, StringData.fromString(vo.BNCF_TX_ANNT))
                           row.setField(62, StringData.fromString(vo.BNCF_AFTR_ANNT))
                           row.setField(63, StringData.fromString(vo.SLE_PAY_AMT))
                           row.setField(64, StringData.fromString(vo.MDCL_FEE))
                           row.setField(65, StringData.fromString(vo.BNLN_INT))
                           row.setField(66, StringData.fromString(vo.FX_ITLK_TX_NO))
                           row.setField(67, StringData.fromString(vo.FX_PRFR_XRT_APPC_NO))
                           row.setField(68, StringData.fromString(vo.FX_APLY_XRT))
                           row.setField(69, StringData.fromString("${vo.FX_XRT_ANOUN_NTH}"))
                           row.setField(70, StringData.fromString(vo.BOK_RPT_RSCD))
                           row.setField(71, StringData.fromString(vo.EXPRT_CPT_YN))
                           row.setField(72, StringData.fromString(vo.WCUC_AMT))
                           row.setField(73, StringData.fromString(vo.WCUC_SMPL_TRFAMT))
                           row.setField(74, StringData.fromString(vo.WCUC_ITLK_TRFAMT))
                           row.setField(75, StringData.fromString(vo.WCUC_CASH_AMT))
                           row.setField(76, StringData.fromString(vo.CNTR_MOVE_FRST_PYMN_YN))
                           row.setField(77, StringData.fromString(vo.SCFC_TCFND_BZWK_DVCD))
                           row.setField(78, StringData.fromString(vo.SCFC_TCFND_MESG_ID))
                           row.setField(79, StringData.fromString(vo.SCFC_TCFND_PL_INT))
                           row.setField(80, StringData.fromString(vo.MNRC_RQER_RLNM_DVCD))
                           row.setField(81, StringData.fromString(vo.MNRC_RQER_RNNO))
                           row.setField(82, StringData.fromString(vo.MNRC_RQER_NM))
                           row.setField(83, StringData.fromString(vo.CMS_CD))
                           row.setField(84, StringData.fromString(vo.CMS_RCPR_NM))
                           row.setField(85, StringData.fromString(vo.ORTR_DT))
                           row.setField(86, StringData.fromString("${vo.ORTR_SEQNO}"))
                           row.setField(87, StringData.fromString(vo.INT_CLCL_DTL_HIST_CRTN_YN))
                           row.setField(88, StringData.fromString(vo.SEIZ_DMAN_MGMT_NO))
                           row.setField(89, StringData.fromString(vo.AIQRY_CSCHQ_AMT))
                           row.setField(90, StringData.fromString(vo.OSDCH_TX_UNQ_NO))
                           row.setField(91, StringData.fromString(vo.BANK_GIRCD))
                           row.setField(92, StringData.fromString(vo.GDS_DTLS_CD))
                           row.setField(93, StringData.fromString(vo.WCUC_DEAL_PFLS_AMT))
                           row.setField(94, StringData.fromString(vo.WCUC_ANTT_AMT))
                           row.setField(95, StringData.fromString(vo.DMAN_NO))
                           row.setField(96, StringData.fromString(vo.TX_RSN))
                           row.setField(97, StringData.fromString(vo.FUND_ITMS_CD))
                           row.setField(98, StringData.fromString(vo.MDCL_CTRINT))
                           row.setField(99, StringData.fromString(vo.ITLK_TX_LDNG_YN))
                           row.setField(100, StringData.fromString(vo.FRST_TRNM_IPAD))
                           row.setField(101, StringData.fromString(vo.GUID))
                           row.setField(102, StringData.fromString(vo.SYS_FRST_REG_DTTM))
                           row.setField(103, StringData.fromString(vo.SYS_FRST_REG_EMPNO))
                           row.setField(104, StringData.fromString(vo.SYS_LAST_CHNG_DTTM))
                           row.setField(105, StringData.fromString(vo.SYS_LAST_CHNG_EMPNO))
                           row.setField(106, StringData.fromString(vo.TRSF_BANK_CD))
                           row.setField(107, StringData.fromString(vo.TRSF_ACNO))
                           row.setField(108, StringData.fromString(vo.TRSF_ACCO_DEPR_NM))
                       }
                       row
                   }
   
           FlinkSink.forRowData(input)
               .tableLoader(tableLoader)
               .tableSchema(
                   TableSchema.builder()
                       .field("ACNO", DataTypes.STRING().notNull())
                       .field("TX_DT", DataTypes.STRING().notNull())
                       .field("TX_SEQNO", DataTypes.STRING().notNull())
                       .field("DTL_TX_SEQNO", DataTypes.STRING().notNull())
                       .field("FRMW_CHNG_TMST", DataTypes.STRING())
                       .field("TX_GUID", DataTypes.STRING())
                       .field("SLIP_NO", DataTypes.STRING())
                       .field("DEP_TX_STCD", DataTypes.STRING())
                       .field("DEP_CANC_DVCD", DataTypes.STRING())
                       .field("TX_TIME", DataTypes.STRING())
                       .field("RECKN_DT", DataTypes.STRING())
                       .field("SYS_DT", DataTypes.STRING())
                       .field("SYS_TIME", DataTypes.STRING())
                       .field("DEP_TRRC_KNCD", DataTypes.STRING())
                       .field("TX_CHNL_DVCD", DataTypes.STRING())
                       .field("TX_FRMN_CD", DataTypes.STRING())
                       .field("TX_CD", DataTypes.STRING())
                       .field("TX_BRCD", DataTypes.STRING())
                       .field("TX_TLRNO", DataTypes.STRING())
                       .field("ACCO_MGMT_BRCD", DataTypes.STRING())
                       .field("XN_PB_DVCD", DataTypes.STRING())
                       .field("CURN_CD", DataTypes.STRING())
                       .field("ACCO_SBCD", DataTypes.STRING())
                       .field("GDS_CD", DataTypes.STRING())
                       .field("AFTR_BAL_CHNG_TX_YN", DataTypes.STRING())
                       .field("TX_AMT", DataTypes.STRING())
                       .field("AFTR_BAL", DataTypes.STRING())
                       .field("CASH_AMT", DataTypes.STRING())
                       .field("SMPL_TRFAMT", DataTypes.STRING())
                       .field("TOT_CBPC_AMT", DataTypes.STRING())
                       .field("ITLK_ALTR_AMT", DataTypes.STRING())
                       .field("CSCHQ_ISSU_AMT", DataTypes.STRING())
                       .field("CSCHQ_PRVS_AMT", DataTypes.STRING())
                       .field("OJBB_MNRC_AMT", DataTypes.STRING())
                       .field("OBNK_MNRC_AMT", DataTypes.STRING())
                       .field("DEP_TRRC_CBPC_CD", DataTypes.STRING())
                       .field("DRAF_CHQ_NO", DataTypes.STRING())
                       .field("CBPC_AMT", DataTypes.STRING())
                       .field("FEE_PRFR_RSCD", DataTypes.STRING())
                       .field("DEP_FEE_RCV_DVCD", DataTypes.STRING())
                       .field("FEE_TX_SEQNO", DataTypes.STRING())
                       .field("INSTL_MM", DataTypes.STRING())
                       .field("INSTL_NTH", DataTypes.STRING())
                       .field("OVRD_PPAY_DAYS", DataTypes.STRING())
                       .field("CNGT_MSG_CD", DataTypes.STRING())
                       .field("BBPR_CTNT", DataTypes.STRING())
                       .field("PB_ARR_TRGT_YN", DataTypes.STRING())
                       .field("PSWD_NSTUP_PRVS_YN", DataTypes.STRING())
                       .field("NPSWD_RSCD", DataTypes.STRING())
                       .field("ACCO_UZ_DVCD", DataTypes.STRING())
                       .field("INPUT_ACNO", DataTypes.STRING())
                       .field("BTCH_NO", DataTypes.STRING())
                       .field("AFCL_TX_YN", DataTypes.STRING())
                       .field("PRCG_APRV_TLRNO", DataTypes.STRING())
                       .field("ITLK_TX_DVCD", DataTypes.STRING())
                       .field("DEP_TRRC_TYCD", DataTypes.STRING())
                       .field("RBF_SRVC_ID", DataTypes.STRING())
                       .field("REAL_ACCP_AMT", DataTypes.STRING())
                       .field("INT_CCAM", DataTypes.STRING())
                       .field("UNPAID_BAL", DataTypes.STRING())
                       .field("FRUP_BAL_TX_DT", DataTypes.STRING())
                       .field("BNCF_TX_ANNT", DataTypes.STRING())
                       .field("BNCF_AFTR_ANNT", DataTypes.STRING())
                       .field("SLE_PAY_AMT", DataTypes.STRING())
                       .field("MDCL_FEE", DataTypes.STRING())
                       .field("BNLN_INT", DataTypes.STRING())
                       .field("FX_ITLK_TX_NO", DataTypes.STRING())
                       .field("FX_PRFR_XRT_APPC_NO", DataTypes.STRING())
                       .field("FX_APLY_XRT", DataTypes.STRING())
                       .field("FX_XRT_ANOUN_NTH", DataTypes.STRING())
                       .field("BOK_RPT_RSCD", DataTypes.STRING())
                       .field("EXPRT_CPT_YN", DataTypes.STRING())
                       .field("WCUC_AMT", DataTypes.STRING())
                       .field("WCUC_SMPL_TRFAMT", DataTypes.STRING())
                       .field("WCUC_ITLK_TRFAMT", DataTypes.STRING())
                       .field("WCUC_CASH_AMT", DataTypes.STRING())
                       .field("CNTR_MOVE_FRST_PYMN_YN", DataTypes.STRING())
                       .field("SCFC_TCFND_BZWK_DVCD", DataTypes.STRING())
                       .field("SCFC_TCFND_MESG_ID", DataTypes.STRING())
                       .field("SCFC_TCFND_PL_INT", DataTypes.STRING())
                       .field("MNRC_RQER_RLNM_DVCD", DataTypes.STRING())
                       .field("MNRC_RQER_RNNO", DataTypes.STRING())
                       .field("MNRC_RQER_NM", DataTypes.STRING())
                       .field("CMS_CD", DataTypes.STRING())
                       .field("CMS_RCPR_NM", DataTypes.STRING())
                       .field("ORTR_DT", DataTypes.STRING())
                       .field("ORTR_SEQNO", DataTypes.STRING())
                       .field("INT_CLCL_DTL_HIST_CRTN_YN", DataTypes.STRING())
                       .field("SEIZ_DMAN_MGMT_NO", DataTypes.STRING())
                       .field("AIQRY_CSCHQ_AMT", DataTypes.STRING())
                       .field("OSDCH_TX_UNQ_NO", DataTypes.STRING())
                       .field("BANK_GIRCD", DataTypes.STRING())
                       .field("GDS_DTLS_CD", DataTypes.STRING())
                       .field("WCUC_DEAL_PFLS_AMT", DataTypes.STRING())
                       .field("WCUC_ANTT_AMT", DataTypes.STRING())
                       .field("DMAN_NO", DataTypes.STRING())
                       .field("TX_RSN", DataTypes.STRING())
                       .field("FUND_ITMS_CD", DataTypes.STRING())
                       .field("MDCL_CTRINT", DataTypes.STRING())
                       .field("ITLK_TX_LDNG_YN", DataTypes.STRING())
                       .field("FRST_TRNM_IPAD", DataTypes.STRING())
                       .field("GUID", DataTypes.STRING())
                       .field("SYS_FRST_REG_DTTM", DataTypes.STRING())
                       .field("SYS_FRST_REG_EMPNO", DataTypes.STRING())
                       .field("SYS_LAST_CHNG_DTTM", DataTypes.STRING())
                       .field("SYS_LAST_CHNG_EMPNO", DataTypes.STRING())
                       .field("TRSF_BANK_CD", DataTypes.STRING())
                       .field("TRSF_ACNO", DataTypes.STRING())
                       .field("TRSF_ACCO_DEPR_NM", DataTypes.STRING())
                       .primaryKey("ACNO", "TX_DT", "TX_SEQNO", "DTL_TX_SEQNO")
                       .build()
               )
               .equalityFieldColumns(listOf("ACNO", "TX_DT", "TX_SEQNO", "DTL_TX_SEQNO"))
               .set("write.delete.mode", "copy-on-write")
               .set("write.update.mode", "copy-on-write")
               .set("write.merge.mode", "copy-on-write")
               .set(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key(), "${1024 * 1024 * 1024L}")
               .upsert(true)
               .append()
   
           env.execute("ICEBERG COW BMT")
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] BsoBird commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "BsoBird (via GitHub)" <gi...@apache.org>.
BsoBird commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1545112810

   @yegangy0718 
   Are you going to use a HASH grouping scheme for the primary key to improve the small file problem?
   It does alleviate some of the problems, but the underlying problem is not solved.
   After all, we need to perform a rewrite operation to merge the files and clear the delete-file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yegangy0718 commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "yegangy0718 (via GitHub)" <gi...@apache.org>.
yegangy0718 commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1545070317

   I'm not familiar with taking CDC as source. 
   But @stevenzwu  and I are working on tamping small files via shuffling https://github.com/apache/iceberg/issues/6303. The basic idea to is collect data distribution information and then use that to improve data clustering so that every iceberg writer receives specific data keys.  Sharing it with you to see if it helps. 
   cc @dramaticlly  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Flink DataStream Small file Issue And RewriteDataFiles Action [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action
URL: https://github.com/apache/iceberg/issues/7568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] geonyeongkim commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "geonyeongkim (via GitHub)" <gi...@apache.org>.
geonyeongkim commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1545038880

   @BsoBird 
   Oh, that's right.
   
   Hudi Flink confirmed that it provides a compaction operator.
   
   I wanted to know if Iceberg Flink offers the same, but it doesn't seem to be supported.
   
   If there are multiple writers in the Iceberg table, an error occurs.
   
   BsoBird are you pausing stream processing and rewriting to batch?
   
   Iceberg Flink is loading very small files in kb.
   Therefore, I don't know if the architecture is correct that stops the stream every few minutes and rewrites deployment takes place.
   
   Spark streaming allows iceberg loading to prevent small files as much as possible because it operates as microbatch, but to prevent data reversal of cdc data, an operation that writes the late one with the window function in the Spark engine must be performed.
   
   This results in significant throughput degradation due to shuffling.
   
   I ultimately want to load the cdc data as quickly as possible without getting a small file.
   
   BsoBird May I know more about your architecture utilizing Iceberg Flink?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] BsoBird commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "BsoBird (via GitHub)" <gi...@apache.org>.
BsoBird commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1557384608

   @geonyeongkim 
   You can try using: https://github.com/NetEase/arctic
   arctic provides ICEBERG management functions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Flink DataStream Small file Issue And RewriteDataFiles Action [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1843918076

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yegangy0718 commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "yegangy0718 (via GitHub)" <gi...@apache.org>.
yegangy0718 commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1546042171

   For merging and cleaning delete files via separate spark job, @szehon-ho has merged a PR recently https://github.com/apache/iceberg/pull/7389.  You can take a look if it helps. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] BsoBird commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "BsoBird (via GitHub)" <gi...@apache.org>.
BsoBird commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1554005738

   @geonyeongkim 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] BsoBird commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "BsoBird (via GitHub)" <gi...@apache.org>.
BsoBird commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1544261396

   @geonyeongkim 
   Hello. In my experience, performing a rewrite after submitting multiple checkpoints in a row does not work well.
   
   This is because the rewrite action is executed synchronously with the stream processing. It will cause the stream write to block(wait rewriteAction checkpoint success). The more data in the partition, the slower the execution. The end result is worse. Therefore, our side in the production environment abandoned such a processing scheme.
   
   The ideal way is to start a separate service that monitors the ICEBERG table for small files and periodically initiates asynchronous rewrite tasks. HUDI has adopted this idea, and HUDI will integrate this function into the DATASTREAM internal. But ICEBERG currently does not have built-in integration of similar features, which requires some additional development work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Flink DataStream Small file Issue And RewriteDataFiles Action [iceberg]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1817684736

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1543767595

   @geonyeongkim I'm trying to replicate the situation on my end. Can you describe how you created the table? This Flink job will append to the table. Do you have a partitioning strategy on the table? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] geonyeongkim commented on issue #7568: Flink DataStream Small file Issue And RewriteDataFiles Action

Posted by "geonyeongkim (via GitHub)" <gi...@apache.org>.
geonyeongkim commented on issue #7568:
URL: https://github.com/apache/iceberg/issues/7568#issuecomment-1543799584

   @Fokko 
   I pre-generated a table with Hadoop catalog using spark sql.
   
   Below is my spark sql code.
   
   ```python
   spark.sql("""
   CREATE TABLE `iceberg`.`cpdep_tacct_l`.`cow` (
   `ACNO` STRING,
   `TX_DT` STRING,
   `TX_SEQNO` STRING,
   `DTL_TX_SEQNO` STRING,
   `FRMW_CHNG_TMST` STRING,
   `TX_GUID` STRING,
   `SLIP_NO` STRING,
   `DEP_TX_STCD` STRING,
   `DEP_CANC_DVCD` STRING,
   `TX_TIME` STRING,
   `RECKN_DT` STRING,
   `SYS_DT` STRING,
   `SYS_TIME` STRING,
   `DEP_TRRC_KNCD` STRING,
   `TX_CHNL_DVCD` STRING,
   `TX_FRMN_CD` STRING,
   `TX_CD` STRING,
   `TX_BRCD` STRING,
   `TX_TLRNO` STRING,
   `ACCO_MGMT_BRCD` STRING,
   `XN_PB_DVCD` STRING,
   `CURN_CD` STRING,
   `ACCO_SBCD` STRING,
   `GDS_CD` STRING,
   `AFTR_BAL_CHNG_TX_YN` STRING,
   `TX_AMT` STRING,
   `AFTR_BAL` STRING,
   `CASH_AMT` STRING,
   `SMPL_TRFAMT` STRING,
   `TOT_CBPC_AMT` STRING,
   `ITLK_ALTR_AMT` STRING,
   `CSCHQ_ISSU_AMT` STRING,
   `CSCHQ_PRVS_AMT` STRING,
   `OJBB_MNRC_AMT` STRING,
   `OBNK_MNRC_AMT` STRING,
   `DEP_TRRC_CBPC_CD` STRING,
   `DRAF_CHQ_NO` STRING,
   `CBPC_AMT` STRING,
   `FEE_PRFR_RSCD` STRING,
   `DEP_FEE_RCV_DVCD` STRING,
   `FEE_TX_SEQNO` STRING,
   `INSTL_MM` STRING,
   `INSTL_NTH` STRING,
   `OVRD_PPAY_DAYS` STRING,
   `CNGT_MSG_CD` STRING,
   `BBPR_CTNT` STRING,
   `PB_ARR_TRGT_YN` STRING,
   `PSWD_NSTUP_PRVS_YN` STRING,
   `NPSWD_RSCD` STRING,
   `ACCO_UZ_DVCD` STRING,
   `INPUT_ACNO` STRING,
   `BTCH_NO` STRING,
   `AFCL_TX_YN` STRING,
   `PRCG_APRV_TLRNO` STRING,
   `ITLK_TX_DVCD` STRING,
   `DEP_TRRC_TYCD` STRING,
   `RBF_SRVC_ID` STRING,
   `REAL_ACCP_AMT` STRING,
   `INT_CCAM` STRING,
   `UNPAID_BAL` STRING,
   `FRUP_BAL_TX_DT` STRING,
   `BNCF_TX_ANNT` STRING,
   `BNCF_AFTR_ANNT` STRING,
   `SLE_PAY_AMT` STRING,
   `MDCL_FEE` STRING,
   `BNLN_INT` STRING,
   `FX_ITLK_TX_NO` STRING,
   `FX_PRFR_XRT_APPC_NO` STRING,
   `FX_APLY_XRT` STRING,
   `FX_XRT_ANOUN_NTH` STRING,
   `BOK_RPT_RSCD` STRING,
   `EXPRT_CPT_YN` STRING,
   `WCUC_AMT` STRING,
   `WCUC_SMPL_TRFAMT` STRING,
   `WCUC_ITLK_TRFAMT` STRING,
   `WCUC_CASH_AMT` STRING,
   `CNTR_MOVE_FRST_PYMN_YN` STRING,
   `SCFC_TCFND_BZWK_DVCD` STRING,
   `SCFC_TCFND_MESG_ID` STRING,
   `SCFC_TCFND_PL_INT` STRING,
   `MNRC_RQER_RLNM_DVCD` STRING,
   `MNRC_RQER_RNNO` STRING,
   `MNRC_RQER_NM` STRING,
   `CMS_CD` STRING,
   `CMS_RCPR_NM` STRING,
   `ORTR_DT` STRING,
   `ORTR_SEQNO` STRING,
   `INT_CLCL_DTL_HIST_CRTN_YN` STRING,
   `SEIZ_DMAN_MGMT_NO` STRING,
   `AIQRY_CSCHQ_AMT` STRING,
   `OSDCH_TX_UNQ_NO` STRING,
   `BANK_GIRCD` STRING,
   `GDS_DTLS_CD` STRING,
   `WCUC_DEAL_PFLS_AMT` STRING,
   `WCUC_ANTT_AMT` STRING,
   `DMAN_NO` STRING,
   `TX_RSN` STRING,
   `FUND_ITMS_CD` STRING,
   `MDCL_CTRINT` STRING,
   `ITLK_TX_LDNG_YN` STRING,
   `FRST_TRNM_IPAD` STRING,
   `GUID` STRING,
   `SYS_FRST_REG_DTTM` STRING,
   `SYS_FRST_REG_EMPNO` STRING,
   `SYS_LAST_CHNG_DTTM` STRING,
   `SYS_LAST_CHNG_EMPNO` STRING,
   `TRSF_BANK_CD` STRING,
   `TRSF_ACNO` STRING,
   `TRSF_ACCO_DEPR_NM` STRING
   )
   USING iceberg
   TBLPROPERTIES (
       'format-version'='2',
       'write.delete.mode'='copy-on-write',
       'write.update.mode'='copy-on-write',
       'write.merge.mode'='copy-on-write',
       'write.metadata.delete-after-commit.enabled'='true',
       'write.metadata.previous-versions-max'='10'
   )
   """)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org