You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/16 09:51:54 UTC

[GitHub] [iceberg] tuziling opened a new issue #3129: flink +iceberg +s3 Data is written in real time

tuziling opened a new issue #3129:
URL: https://github.com/apache/iceberg/issues/3129


   @Slf4j
   public class AppendingData {
   
       public static void main(String[] args) throws Exception {
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           log.info("AppendingData 程序开始 ......");
   
           //读取kafka的数据
           Properties properties = new Properties();
           properties.setProperty("bootstrap.servers","10.0.0.53:9092");
           properties.setProperty("zookeeper.connect","10.0.0.53:2181");
           properties.setProperty("group.id","test-1451");
           String topicName = "test";
           FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), properties);
           kafkaConsumer.setStartFromLatest();
           DataStream<Tuple2<Long, String>> kafkaData = env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<String, Tuple2<Long, String>>() {
               @Override
               public void flatMap(String value, Collector<Tuple2<Long, String>> out) throws Exception {
                   if (value != null && value.length() > 0) {
                       String[] arr = value.split(" ");
                       for (int i = 0; i < arr.length; i++) {
                           if (arr[i] != null && arr[i].length() > 0) {
                               out.collect(new Tuple2<>(new Long((long) arr[i].length()), arr[i]));
                           }
                       }
                   }
               }
           });
           DataStream<RowData> input = kafkaData.map(new MapFunction<Tuple2<Long, String>, RowData>() {
               @Override
               public RowData map(Tuple2<Long, String> value) throws Exception {
                   GenericRowData genericRow = new GenericRowData(2);
                   genericRow.setField(0,value.f0);
                   genericRow.setField(1, StringData.fromString(value.f1));
                   RowData rowData = genericRow;
                   log.info("map function --> rowData={}",rowData);
                   return rowData;
               }
           });
        
           String catalogName = "songfuhao_catalog";
           Configuration hadoopConf = new Configuration();
           Map<String, String> props = new HashMap<>();
           props.put("warehouse", "s3://songfuhao-bucket/songfuhao");
           props.put("catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog");
           props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
           props.put("lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager");
           props.put("lock.table", "myGlueLockTable");
   
           props.put("AWS_ACCESS_KEY_ID", "AKIAYBGQUCQ7MFKJTKUV");
           props.put("AWS_SECRET_ACCESS_KEY", "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");
           props.put("AWS_DEFAULT_REGION", "cn-northwest-1");
           String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
           CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, props, hadoopConf, impl);
           TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog, TableIdentifier.of("songfh", "sfuhao_test"));
   
           FlinkSink.forRowData(input)
                   .tableLoader(tableLoader)
               //  .overwrite(true)
                   .build();
   
           env.execute("Test Iceberg DataStream");
       }
   }
   
   The data in Kafka can be consumed, that is, the data is not finally entered into S3, and there is no error when running. Does anyone know what happened?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling closed issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
tuziling closed issue #3129:
URL: https://github.com/apache/iceberg/issues/3129


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling commented on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
tuziling commented on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-921397244


   我已经在GenericRowData中设置了RowKind ,不过数据还是没有进入到iceberg表中。你的数据实时写入到iceberg表了吗?能不能将完整的代码给我看一下,谢谢!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] bvinayakumar commented on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
bvinayakumar commented on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-920893710


   @tuziling I faced similar issues but running the code in debug mode in IDE helped catch the exact exception. I observed that you are not setting RowKind on GenericRowData. Hope this helps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling commented on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
tuziling commented on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-921397336


   I have set RowKind in GenericRowData, but the data still does not enter the iceberg table. Is your data written to the iceberg table in real time? Can you show me the complete code, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling commented on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
tuziling commented on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-921643346


   How does flink update the data in iceberg, do I need to specify the primary key? Is it possible to use RowKind.UPDATE_AFTER or RowKind.UPDATE_BEFORE?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] tuziling removed a comment on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
tuziling removed a comment on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-921397244


   我已经在GenericRowData中设置了RowKind ,不过数据还是没有进入到iceberg表中。你的数据实时写入到iceberg表了吗?能不能将完整的代码给我看一下,谢谢!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massquantity commented on issue #3129: flink +iceberg +s3 Data is written in real time

Posted by GitBox <gi...@apache.org>.
massquantity commented on issue #3129:
URL: https://github.com/apache/iceberg/issues/3129#issuecomment-924565407


   Maybe you should turn on checkpoint in Flink? See this issue:  https://github.com/apache/iceberg/issues/2693


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org