You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/02 04:09:23 UTC

[GitHub] [iceberg] openinx opened a new pull request #1158: Flink: Add Orc value reader, writer implementations

openinx opened a new pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#issuecomment-664179724


   According to the issue https://github.com/apache/iceberg/issues/1215,  we've upgraded the flink to 1.11 version and planed to support the RowData avro, parquet, orc readers and writers,  so I will create a new pull request with RowData implementation. Close this one now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#issuecomment-656950457


   Looks like there are some minor things to clean up, but overall the code changes are close to being ready.
   
   Before merging this, I'd like to understand how Flink will work with the data that is produced so we can make a good decision about whether we should continue with ORC like we have for Parquet and Avro (sharing generics code) or whether we should think about building separate readers and writers for its object model. Thanks for bringing this up, @rdsr!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#issuecomment-653616737


   I'll have a look this week, thks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r452299179



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java
##########
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public abstract class BaseOrcWriter<T> implements OrcValueWriter<T> {
+  private final Converter[] converters;
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  protected BaseOrcWriter(TypeDescription schema) {
+    this.converters = buildConverters(schema);
+  }
+
+  protected Converter[] getConverters() {
+    return this.converters;
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  protected interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.

Review comment:
       here as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#issuecomment-653615555


   @shardulm94 and @rdsr, could you help review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453637953



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       @edgarRd @rdsr @rdblue I did a refactor for the GenericOrcWriter and moved the common writers to `GenericOrcWriter`, the pull request is here: https://github.com/apache/iceberg/pull/1197/files. Mind to take a look ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453131014



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
##########
@@ -53,86 +50,13 @@ public GenericOrcReader(
 
   @Override
   public Record read(VectorizedRowBatch batch, int row) {
-    return (Record) reader.read(new StructColumnVector(batch.size, batch.cols), row);
+    return (Record) getReader().read(new StructColumnVector(batch.size, batch.cols), row);

Review comment:
       Getter methods should not start with `get`. It doesn't add anything (every method "gets" its return value) and is not idiomatic in other JVM languages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r452359199



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java
##########
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public abstract class BaseOrcWriter<T> implements OrcValueWriter<T> {
+  private final Converter[] converters;
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

Review comment:
       I think you can use `DateTimeUtil.EPOCH` and `DateTimeUtil.EPOCH_DAY` instead.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       I think I agree with @rdsr - main concern would be the flexibility for changes to `GenericOrcReader`, but I guess the trade-off is vs code re-usability. If there's confidence that the generics readers are fairly stable then it should not be a huge issue, but the concern seems valid on coupling these readers. I wonder if instead of using inheritance a delegator approach would be possible avoid a tight coupling.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r452298913



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java
##########
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public abstract class BaseOrcWriter<T> implements OrcValueWriter<T> {
+  private final Converter[] converters;
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  protected BaseOrcWriter(TypeDescription schema) {
+    this.converters = buildConverters(schema);
+  }
+
+  protected Converter[] getConverters() {
+    return this.converters;
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to

Review comment:
       nit: Remove Spark from comments




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r451323369



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       It seems that we are extending the `GenericOrcReader` to also be used in Flink. Won't that create problems? E.g `GenericOrcReader` is being used to construct a `readerFunc` for IcebergGenerics . For instance it uses `LocalTime` for  Iceberg's Time datatype. Won't Flink have its own in-memory representation for primitive types and maybe also for map and list types?
   
   I think it will be better to have a completely separate `FlinkOrcReader` which does not rely on `GenericOrcReader` similar to `SparkOrcReader`. In this way changes to `GenericOrcReader` won't break `FlinkOrcReader` and there is no tight coupling between the two.
   
   @rdblue , @openinx thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453547032



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       I agreed that extending the `BaseOrcReader`  and `BaseOrcWriter` introduces tight coupling, I tried to de-couple `flink` writer from generic orc writers and let them share the common writers. But seems it's hard to share the codes because we used a static `buildConverter` method to build the converter for each data type and few Converter depends on the static `buildConverter`, makes hard to abstract to the common converters.  Just curious why did we implement the orc writer in `converter` way instead of visiting the types by `OrcSchemaWithTypeVisitor` and generate relative `OrcRowWriter`  (in this way we could share most of the writers.),  the current `converter` seems strange compared to other `parquet` writers and `avro` writers. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r452434696



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java
##########
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public abstract class BaseOrcWriter<T> implements OrcValueWriter<T> {
+  private final Converter[] converters;
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  protected BaseOrcWriter(TypeDescription schema) {
+    this.converters = buildConverters(schema);
+  }
+
+  protected Converter[] getConverters() {
+    return this.converters;
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  protected interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     *
+     * @param rowId  the row in the ColumnVector
+     * @param data   either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  static class BooleanConverter implements Converter<Boolean> {
+    @Override
+    public Class<Boolean> getJavaClass() {
+      return Boolean.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Boolean data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data ? 1 : 0;
+      }
+    }
+  }
+
+  static class ByteConverter implements Converter<Byte> {
+    @Override
+    public Class<Byte> getJavaClass() {
+      return Byte.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Byte data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class ShortConverter implements Converter<Short> {
+    @Override
+    public Class<Short> getJavaClass() {
+      return Short.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Short data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class IntConverter implements Converter<Integer> {
+    @Override
+    public Class<Integer> getJavaClass() {
+      return Integer.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Integer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class TimeConverter implements Converter<LocalTime> {
+    @Override
+    public Class<LocalTime> getJavaClass() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000;
+      }
+    }
+  }
+
+  static class LongConverter implements Converter<Long> {
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class FloatConverter implements Converter<Float> {
+    @Override
+    public Class<Float> getJavaClass() {
+      return Float.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Float data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class DoubleConverter implements Converter<Double> {
+    @Override
+    public Class<Double> getJavaClass() {
+      return Double.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Double data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  static class StringConverter implements Converter<String> {
+    @Override
+    public Class<String> getJavaClass() {
+      return String.class;
+    }
+
+    @Override
+    public void addValue(int rowId, String data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        byte[] value = data.getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  static class BytesConverter implements Converter<ByteBuffer> {
+    @Override
+    public Class<ByteBuffer> getJavaClass() {
+      return ByteBuffer.class;
+    }
+
+    @Override
+    public void addValue(int rowId, ByteBuffer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length);
+      }
+    }
+  }
+
+  static class UUIDConverter implements Converter<UUID> {
+    @Override
+    public Class<UUID> getJavaClass() {
+      return UUID.class;
+    }
+
+    @Override
+    public void addValue(int rowId, UUID data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ByteBuffer buffer = ByteBuffer.allocate(16);
+        buffer.putLong(data.getMostSignificantBits());
+        buffer.putLong(data.getLeastSignificantBits());
+        ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
+      }
+    }
+  }
+
+  static class FixedConverter implements Converter<byte[]> {
+    @Override
+    public Class<byte[]> getJavaClass() {
+      return byte[].class;
+    }
+
+    @Override
+    public void addValue(int rowId, byte[] data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
+      }
+    }
+  }
+
+  static class DateConverter implements Converter<LocalDate> {
+    @Override
+    public Class<LocalDate> getJavaClass() {
+      return LocalDate.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalDate data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data);
+      }
+    }
+  }
+
+  static class TimestampTzConverter implements Converter<OffsetDateTime> {
+    @Override
+    public Class<OffsetDateTime> getJavaClass() {
+      return OffsetDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, OffsetDateTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        cv.time[rowId] = data.toInstant().toEpochMilli(); // millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision
+      }
+    }
+  }
+
+  static class TimestampConverter implements Converter<LocalDateTime> {
+
+    @Override
+    public Class<LocalDateTime> getJavaClass() {
+      return LocalDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalDateTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        cv.setIsUTC(true);
+        cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision
+      }
+    }
+  }
+
+  static class Decimal18Converter implements Converter<BigDecimal> {
+    private final int scale;
+
+    Decimal18Converter(TypeDescription schema) {
+      this.scale = schema.getScale();
+    }
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    @Override
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId]
+            .setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
+      }
+    }
+  }
+
+  static class Decimal38Converter implements Converter<BigDecimal> {
+    Decimal38Converter(TypeDescription schema) {
+    }
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    @Override
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false));
+      }
+    }
+  }
+
+  protected abstract Converter<T> createStructConverter(TypeDescription schema);

Review comment:
       It seems apart from Struct, flink will use the same in-memory objects for map, list and primitive types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#issuecomment-655270652


   @rdsr  Please help to take a look when you have time , Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r451323369



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       It seems that we are extending the `GenericOrcReader` to also be used in Flink. Won't that create problems? E.g `GenericOrcReader` is being used to construct a `readerFunc` for IcebergGenerics . For instance it uses `LocalTime` for  Iceberg's Time datatype. Won't Flink have its own in-memory representation for primitive types and maybe also for map and list types?
   
   I think it will be better to have a completely separate `FlinkOrcReader` similar to `SparkOrcReader`
   
   @rdblue , @openinx thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r458260259



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       @chenjunjiedada opened an issue for the concern about data types and @JingsongLi clarified the types that Flink uses there. @rdsr was right and it isn't correct to copy generics with a different row type.
   
   Sounds like #1197 is a good start. We should probably reverse how we have refactored the Avro and Parquet generics as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453216014



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       Yea, I also think the `GenericOrcReader` is a pretty small wrapper and the bulk of the functionality is provided by the readers/functions for specific types defined in `OrcGenericReaders`. In that regard extending the `GenericOrcReader` doesn't buy us much. We can easily share code by picking and choosing the right readers/functions from `GenericOrcReaders` and providing flink specific type readers where flink types diverge from Iceberg Generics.   The good thing about doing this IMO is that we get rid of extending classes which makes code changes brittle and introduces tight coupling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r451323369



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       It seems that we are extending the `GenericORCReader` to also be used in Flink. Won't that create problems? E.g `GenericORCReader` is being used to construct a readerFunc for IcebergGenerics . For instance it uses `LocalTime`  Iceberg's Time datatype. Won't Flink have its own im-memory representation for primitive types and maybe also for map and list types?
   I think it would have been better to have a completely separate `FlinkOrcReader` similar to `SparkOrcReader`
   
   @rdblue , @openinx thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453131276



##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.data.orc.BaseOrcWriter;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+public class FlinkOrcWriter extends BaseOrcWriter<Row> {
+
+  private FlinkOrcWriter(TypeDescription schema) {
+    super(schema);
+  }
+
+  public static OrcValueWriter<Row> buildWriter(TypeDescription fileSchema) {
+    return new FlinkOrcWriter(fileSchema);
+  }
+
+  @Override
+  protected Converter<Row> createStructConverter(TypeDescription schema) {
+    return new RowConverter(schema);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void write(Row value, VectorizedRowBatch output) {
+    int row = output.size++;

Review comment:
       We avoid using return values from `++` expressions. That helps readability because statement order is clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r451926889



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       In current flink stable version,  Flink is using the `Row` type with  an array of Java objects, it's the most common way for flink now.  In feature, it will use `RowData`  interface , whose implementation could be binary-oriented or java object oriented, I think in that time we could separate the `FlinkOrcReader`.  (issue: https://issues.apache.org/jira/browse/FLINK-16995). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx closed pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx closed pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453216014



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       Yea, I also think the `GenericOrcReader` is a pretty small wrapper and the bulk of the functionality is baked into the readers/functions for specific types defined in `OrcGenericReaders`. In that regard extending the `GenericOrcReader` doesn't buy us much. We can easily share code by picking and choosing the right readers/functions from `GenericOrcReaders` and providing flink specific type readers where flink types diverge from Iceberg Generics.   The good thing about doing this IMO is that we get rid of extending classes which makes code changes brittle and introduces tight coupling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453216014



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       Yea, I also think the GenericOrcReader is a pretty small wrapper and the bulk of the functionality is baked into the readers/functions for specific types defined in `OrcGenericReaders`. In that regard extending the `GenericOrcReader` doesn't buy us much. We can easily share code by picking and choosing the right readers/functions from `GenericOrcReaders` and providing flink specific type readers where flink types diverge from Iceberg Generics.   The good thing about doing this IMO is that we get rid of extending classes which makes code changes brittle and introduces tight coupling




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r453130752



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       > What about Flink's primitive types do they align with well with Iceberg Generics?
   
   My understanding is that Flink does (or can) use the same representations, except for structs. It would be good to have a response for @openinx or @JingsongLi, though. From looking at the Flink code, not all of the default conversions are these types. `VarBinary` uses [`byte[]`](https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java#L57) instead of `ByteBuffer` and `LocalZonedTimestampType` uses [`Instant`](https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java#L78) (but the Javadoc says its behavior is like `OffsetDateTime` that we use). That said, it looks like Flink might support multiple conversions.
   
   Depending on what Flink uses internally, @rdsr might be right about building a set of readers specific to those types. But if we can make this more generic easily, then I like the idea of doing that. Ideally, I think new object models would be created by providing a few methods to create and read into an object, kind of like our [methods to plug in struct types](https://github.com/apache/iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L708-L723).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r458494939



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       Yes, you are right.   After the #1197 get merged, I will  recreate this patch for reviewing.   Thanks. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1158: Flink: Add Orc value reader, writer implementations

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1158:
URL: https://github.com/apache/iceberg/pull/1158#discussion_r452285009



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+
+public abstract class BaseOrcReader<T> implements OrcRowReader<T> {

Review comment:
       What about Flink's primitive types do they align with well with Iceberg Generics?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org