You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/25 11:02:08 UTC

[GitHub] [iceberg] Reo-LEI opened a new pull request #2862: Flink: Enrich flink actions

Reo-LEI opened a new pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862


   Currently, flink actions only have `RewriteDataFilesAction`, and I plan to implement `expireSnapshots`, `rewriteManifests` and `deleteOrphanFiles` actions for flink. I will refer to `SparkActions` implementation and trying transplant them to flink. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on pull request #2862: Flink: Enrich flink actions

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862#issuecomment-886186358


   Now, the `expireSnapshots` coding is finish, but I'm blocking by a Kyro Serialization Error when I testing.
   
   Kyro raise NEP when serialize `hadoopConf` of `HadoopFileIO` when flink create `CombinedScanTask` source. the full stack as follow:
   ```
   Exception in thread "main" java.lang.RuntimeException: Serializing the source elements failed: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1046)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1021)
   	at org.apache.iceberg.flink.source.MetadataTableSource$Builder.build(MetadataTableSource.java:92)
   	at org.apache.iceberg.flink.actions.BaseFlinkAction.loadMetadataTable(BaseFlinkAction.java:117)
   	at org.apache.iceberg.flink.actions.BaseFlinkAction.buildValidDataFileTable(BaseFlinkAction.java:92)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.buildValidFileTable(ExpireSnapshotsAction.java:122)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:139)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:117)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:44)
   	at cn.walden.stream.IcebergFlinkActions.main(IcebergFlinkActions.java:28)
   Caused by: java.io.IOException: Serializing the source elements failed: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.<init>(FromElementsFunction.java:93)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1044)
   	... 9 more
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
   	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.<init>(FromElementsFunction.java:89)
   	... 10 more
   Caused by: java.lang.NullPointerException
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
   	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
   	... 23 more
   ```
   
   I'm trying to figure out the reason of that, but I have been blocking for this for a while and I don't have a clue. Could someone give me a little help? @rdblue @aokolnychyi @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a change in pull request #2862: Flink: Enrich flink actions

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on a change in pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862#discussion_r678471846



##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/FlinkActions.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import com.twitter.chill.java.ClosureSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.ExpireSnapshots;
+
+public class FlinkActions implements ActionsProvider {
+
+  private static final Configuration CONFIG = new Configuration()
+      // disable classloader check as Avro may cache class/object in the serializers.
+      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+  private final StreamExecutionEnvironment env;
+
+  private FlinkActions(StreamExecutionEnvironment env) {
+    // register ClosureSerializer to prevent NPE when Kryo serialize lambda expr(e.g. HadoopFileIO.hadoopConf)
+    env.getConfig().registerTypeWithKryoSerializer(ClosureSerializer.Closure.class, ClosureSerializer.class);

Review comment:
       I fix the NPE of kryo by register `ClosureSerializer` to flink, cuz Kryo serialize lambda use `ClosureSerializer.Closure`, but not register serialize for this class. I think that is a bug for Kryo or Flink. I hope that can help you @kbendick 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2862: Flink: Enrich flink actions

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862#issuecomment-886242418


   Looks like the FileIO was not prepared to be serialized by wrapping its Hadoop Configuration in SerializableConfiguration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI edited a comment on pull request #2862: Flink: Enrich flink actions

Posted by GitBox <gi...@apache.org>.
Reo-LEI edited a comment on pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862#issuecomment-886186358


   Now, the `expireSnapshots` coding is finish, but I'm blocking by a `Kyro Serialization Error` when I testing.
   
   Kyro raise a NPE when serialize `hadoopConf` of `HadoopFileIO` when flink create `CombinedScanTask` source. the full stack as follow:
   ```
   Exception in thread "main" java.lang.RuntimeException: Serializing the source elements failed: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1046)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1021)
   	at org.apache.iceberg.flink.source.MetadataTableSource$Builder.build(MetadataTableSource.java:92)
   	at org.apache.iceberg.flink.actions.BaseFlinkAction.loadMetadataTable(BaseFlinkAction.java:117)
   	at org.apache.iceberg.flink.actions.BaseFlinkAction.buildValidDataFileTable(BaseFlinkAction.java:92)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.buildValidFileTable(ExpireSnapshotsAction.java:122)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:139)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:117)
   	at org.apache.iceberg.flink.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:44)
   	at cn.walden.stream.IcebergFlinkActions.main(IcebergFlinkActions.java:28)
   Caused by: java.io.IOException: Serializing the source elements failed: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.<init>(FromElementsFunction.java:93)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1044)
   	... 9 more
   Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
   Serialization trace:
   hadoopConf (org.apache.iceberg.hadoop.HadoopFileIO)
   io (org.apache.iceberg.AllManifestsTable$ManifestListReadTask)
   tasks (org.apache.iceberg.BaseCombinedScanTask)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
   	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
   	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
   	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
   	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
   	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.<init>(FromElementsFunction.java:89)
   	... 10 more
   Caused by: java.lang.NullPointerException
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
   	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
   	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
   	... 23 more
   ```
   
   I'm trying to figure out the reason of that, but I have been blocking for this for a while and I don't have a clue. Could someone give me a little help? @rdblue @aokolnychyi @openinx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2862: Flink: Enrich flink actions

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2862:
URL: https://github.com/apache/iceberg/pull/2862#issuecomment-886280201


   I have _potentially_ come across a similar issue when using ORC-1.7.0-SNAPSHOT (which you're definitively not using here). It only _possibly_ may be related to `SerializableConfiguration`, I am still debugging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org