You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "leaves12138 (via GitHub)" <gi...@apache.org> on 2023/11/03 09:31:33 UTC

[PR] [core] index mapping push down to format reader and skip partition value read from file [incubator-paimon]

leaves12138 opened a new pull request, #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257

   ### Purpose
   
   1】 index mapping use ProjectedRow has a pool performance, use index mapping push down to format and expand vectors directly could speed it up
   
   2】 partition value is not needed while reading from file, we skip reading partition columns from files
   
   ### API and Format
   
   No influence to user if there's no bug 
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393835619


##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java:
##########
@@ -55,4 +57,13 @@ public InternalRow next() {
             return null;
         }
     }
+
+    public ColumnarRowIterator mapping(
+            @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
+        if (!mapped) {
+            rowData.mapping(partitionInfo, indexMapping);
+            mapped = true;
+        }
+        return this;

Review Comment:
   Fixed this.



##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java:
##########
@@ -221,4 +225,18 @@ public int hashCode() {
         throw new UnsupportedOperationException(
                 "ColumnarRowData do not support hashCode, please hash fields one by one!");
     }
+
+    public void mapping(@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393836077


##########
paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
+public class PartitionSettedRow implements InternalRow {
+
+    private final BinaryRow paritition;

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1392555798


##########
paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java:
##########
@@ -112,41 +115,65 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
                             key -> {
                                 TableSchema tableSchema = schemaManager.schema(this.schemaId);
                                 TableSchema dataSchema = schemaManager.schema(key.schemaId);
+
+                                // projection to data schema
                                 int[][] dataProjection =
                                         SchemaEvolutionUtil.createDataProjection(
                                                 tableSchema.fields(),
                                                 dataSchema.fields(),
                                                 projection);
-                                RowType rowType = dataSchema.logicalRowType();
+
                                 IndexCastMapping indexCastMapping =
                                         SchemaEvolutionUtil.createIndexCastMapping(
                                                 Projection.of(projection).toTopLevelIndexes(),
                                                 tableSchema.fields(),
                                                 Projection.of(dataProjection).toTopLevelIndexes(),
                                                 dataSchema.fields());
+
                                 List<Predicate> dataFilters =
                                         this.schemaId == key.schemaId
                                                 ? filters
                                                 : SchemaEvolutionUtil.createDataFilters(
                                                         tableSchema.fields(),
                                                         dataSchema.fields(),
                                                         filters);
+
+                                Pair<int[], RowType> partitionPair = null;
+                                if (dataSchema.partitionKeys().size() != 0) {
+                                    Pair<int[], int[][]> partitionMappping =
+                                            PartitionUtils.constructPartitionMapping(
+                                                    dataSchema, dataProjection);
+                                    dataProjection = partitionMappping.getRight();
+                                    partitionPair =
+                                            Pair.of(
+                                                    partitionMappping.getLeft(),
+                                                    dataSchema.projectedLogicalRowType(
+                                                            dataSchema.partitionKeys()));
+                                }
+
                                 return new BulkFormatMapping(
                                         indexCastMapping.getIndexMapping(),
                                         indexCastMapping.getCastMapping(),
+                                        partitionPair,
                                         formatDiscover
                                                 .discover(formatIdentifier)
                                                 .createReaderFactory(
-                                                        rowType, dataProjection, dataFilters));
+                                                        dataSchema.logicalRowType(),

Review Comment:
   Done this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1391123949


##########
paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java:
##########
@@ -112,41 +115,65 @@ public RecordReader<InternalRow> createReader(DataSplit split) throws IOExceptio
                             key -> {
                                 TableSchema tableSchema = schemaManager.schema(this.schemaId);
                                 TableSchema dataSchema = schemaManager.schema(key.schemaId);
+
+                                // projection to data schema
                                 int[][] dataProjection =
                                         SchemaEvolutionUtil.createDataProjection(
                                                 tableSchema.fields(),
                                                 dataSchema.fields(),
                                                 projection);
-                                RowType rowType = dataSchema.logicalRowType();
+
                                 IndexCastMapping indexCastMapping =
                                         SchemaEvolutionUtil.createIndexCastMapping(
                                                 Projection.of(projection).toTopLevelIndexes(),
                                                 tableSchema.fields(),
                                                 Projection.of(dataProjection).toTopLevelIndexes(),
                                                 dataSchema.fields());
+
                                 List<Predicate> dataFilters =
                                         this.schemaId == key.schemaId
                                                 ? filters
                                                 : SchemaEvolutionUtil.createDataFilters(
                                                         tableSchema.fields(),
                                                         dataSchema.fields(),
                                                         filters);
+
+                                Pair<int[], RowType> partitionPair = null;
+                                if (dataSchema.partitionKeys().size() != 0) {
+                                    Pair<int[], int[][]> partitionMappping =
+                                            PartitionUtils.constructPartitionMapping(
+                                                    dataSchema, dataProjection);
+                                    dataProjection = partitionMappping.getRight();
+                                    partitionPair =
+                                            Pair.of(
+                                                    partitionMappping.getLeft(),
+                                                    dataSchema.projectedLogicalRowType(
+                                                            dataSchema.partitionKeys()));
+                                }
+
                                 return new BulkFormatMapping(
                                         indexCastMapping.getIndexMapping(),
                                         indexCastMapping.getCastMapping(),
+                                        partitionPair,
                                         formatDiscover
                                                 .discover(formatIdentifier)
                                                 .createReaderFactory(
-                                                        rowType, dataProjection, dataFilters));
+                                                        dataSchema.logicalRowType(),

Review Comment:
   The type still contains Partition fields? It is weird.
   
   Can you just modify to `FormatReaderFactory createReaderFactory(RowType projectedType, @Nullable List<Predicate> filters)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393834137


##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -120,31 +129,63 @@ public FieldReader visitTimestampMillis(int precision) {
         return TIMESTAMP_MILLS_READER;
     }
 
+    @Override
+    public FieldReader visitTimestampMillis() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393795139


##########
paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** Tests for {@link VectorMappingUtils}. */
+public class VectorMappingUtilsTest {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1394010439


##########
paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
+public class PartitionSettedRow implements InternalRow {
+
+    private final BinaryRow partition;
+    private final PartitionInfo partitionInfo;
+
+    protected InternalRow row;
+
+    protected PartitionSettedRow(PartitionInfo partitionInfo) {

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393835259


##########
paimon-core/src/main/java/org/apache/paimon/io/AbstractFileRecordIterator.java:
##########
@@ -32,11 +34,17 @@
  * @param <V> the row type.
  */
 public abstract class AbstractFileRecordIterator<V> implements RecordReader.RecordIterator<V> {
+
+    @Nullable private final PartitionSettedRow partitionSettedRow;
     @Nullable private final ProjectedRow projectedRow;
     @Nullable private final CastedRow castedRow;
 
     protected AbstractFileRecordIterator(

Review Comment:
   Fixed this and removed class `AbstractFileRecordIterator`



##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java:
##########
@@ -31,6 +32,7 @@
 public class ColumnarRowIterator extends RecyclableIterator<InternalRow> {
 
     private final ColumnarRow rowData;
+    private boolean mapped = false;

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1391104672


##########
paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java:
##########
@@ -93,9 +95,28 @@ public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws IOExcep
     }
 
     @Override
-    public OrcVectorizedReader createReader(FileIO fileIO, Path file, int poolSize)
+    public OrcVectorizedReader createReader(FileIO fileIO, Path file, int pooSize)

Review Comment:
   revert?



##########
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java:
##########
@@ -33,4 +36,19 @@ public interface FormatReaderFactory extends Serializable {
 
     RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
             throws IOException;
+
+    RecordReader<InternalRow> createReader(

Review Comment:
   Do we need to introduce these methods?
   
   I think we can just handle return value from `RecordReader`.
   ```
   if (it is a partitioned table) {
     if (it is a `ColumnarRowIterator`) {
       then use `VectorMappingUtils.wrap`;
     } else {
       use a PartitionedRow to wrap it;
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393592733


##########
paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** Tests for {@link VectorMappingUtils}. */
+public class VectorMappingUtilsTest {

Review Comment:
   Add tests for all types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393946577


##########
paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
+public class PartitionSettedRow implements InternalRow {
+
+    private final BinaryRow partition;
+    private final PartitionInfo partitionInfo;
+
+    protected InternalRow row;
+
+    protected PartitionSettedRow(PartitionInfo partitionInfo) {

Review Comment:
   If the partition info is empty, you should not use `PartitionSettedRow` to wrap the row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1392555540


##########
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java:
##########
@@ -33,4 +36,19 @@ public interface FormatReaderFactory extends Serializable {
 
     RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int poolSize)
             throws IOException;
+
+    RecordReader<InternalRow> createReader(

Review Comment:
   Done this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393582674


##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java:
##########
@@ -31,6 +32,7 @@
 public class ColumnarRowIterator extends RecyclableIterator<InternalRow> {
 
     private final ColumnarRow rowData;
+    private boolean mapped = false;

Review Comment:
   remove this field.



##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java:
##########
@@ -221,4 +225,18 @@ public int hashCode() {
         throw new UnsupportedOperationException(
                 "ColumnarRowData do not support hashCode, please hash fields one by one!");
     }
+
+    public void mapping(@Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {

Review Comment:
   Return a new `ColumnarRow`.
   If there is no performance issue, it is better to keep objects imummable.



##########
paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.PartitionInfo;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
+public class PartitionSettedRow implements InternalRow {
+
+    private final BinaryRow paritition;

Review Comment:
   partition



##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -120,31 +129,63 @@ public FieldReader visitTimestampMillis(int precision) {
         return TIMESTAMP_MILLS_READER;
     }
 
+    @Override
+    public FieldReader visitTimestampMillis() {

Review Comment:
   You can add change origin method `visitTimestampMillis(@Nullable Integer precision)`.



##########
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java:
##########
@@ -55,4 +57,13 @@ public InternalRow next() {
             return null;
         }
     }
+
+    public ColumnarRowIterator mapping(
+            @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) {
+        if (!mapped) {
+            rowData.mapping(partitionInfo, indexMapping);
+            mapped = true;
+        }
+        return this;

Review Comment:
   Do not return this, return a new `ColumnarRowIterator`.



##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -483,24 +526,27 @@ public RowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
         public InternalRow read(Decoder decoder, Object reuse) throws IOException {
             GenericRow row;
             if (reuse instanceof GenericRow
-                    && ((GenericRow) reuse).getFieldCount() == projection.length) {
+                    && ((GenericRow) reuse).getFieldCount() == mapping.length) {
                 row = (GenericRow) reuse;
             } else {
-                row = new GenericRow(projection.length);
+                row = new GenericRow(mapping.length);
             }
 
+            Object[] values = new Object[fieldReaders.length];
             for (int i = 0; i < fieldReaders.length; i += 1) {
-                int[] columns = mapping[i];
-                FieldReader reader = fieldReaders[i];
-                if (columns == null) {
-                    reader.skip(decoder);
+                if (mappingBack[i] >= 0) {
+                    values[i] = fieldReaders[i].read(decoder, row.getField(mappingBack[i]));
                 } else {
-                    Object value = reader.read(decoder, row.getField(columns[0]));
-                    for (int column : columns) {
-                        row.setField(column, value);
-                    }
+                    fieldReaders[i].skip(decoder);
                 }
             }
+
+            values = VectorMappingUtils.mappingObjects(values, mapping);

Review Comment:
   It is weird, there is a for loop to values. We don't need this `mappingObjects`.



##########
paimon-core/src/main/java/org/apache/paimon/io/AbstractFileRecordIterator.java:
##########
@@ -32,11 +34,17 @@
  * @param <V> the row type.
  */
 public abstract class AbstractFileRecordIterator<V> implements RecordReader.RecordIterator<V> {
+
+    @Nullable private final PartitionSettedRow partitionSettedRow;
     @Nullable private final ProjectedRow projectedRow;
     @Nullable private final CastedRow castedRow;
 
     protected AbstractFileRecordIterator(

Review Comment:
   We need to refine this Iterator.
   For example, use `RecordIterator.transform`.



##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -432,49 +482,42 @@ public void skip(Decoder decoder) throws IOException {
         }
     }
 
-    public RowReader createRowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
-        return new RowReader(schema, fieldTypes, projection);
+    public RowReader createRowReader(Schema schema, List<DataField> fields) {
+        return new RowReader(schema, fields);
     }
 
     /** A {@link FieldReader} to read {@link InternalRow}. */
     public class RowReader implements FieldReader {
 
         private final FieldReader[] fieldReaders;
-        private final int[] projection;
-        private final int[][] mapping;
-
-        public RowReader(Schema schema, List<DataType> fieldTypes) {
-            this(schema, fieldTypes, IntStream.range(0, fieldTypes.size()).toArray());
-        }
+        private final int[] mapping;
+        private final int[] mappingBack;
 
-        public RowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
+        public RowReader(Schema schema, List<DataField> fields) {
             List<Schema.Field> schemaFields = schema.getFields();
-            this.fieldReaders = new FieldReader[schemaFields.size()];
-            for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize; i++) {
-                Schema.Field field = schemaFields.get(i);
-                DataType type = fieldTypes.get(i);
-                fieldReaders[i] = visit(field.schema(), type);
-            }
-            this.projection = projection;
-
-            // use fieldTypes to compatible with less fields in avro
-
-            @SuppressWarnings("unchecked")
-            List<Integer>[] mapping = new List[fieldTypes.size()];
-            for (int i = 0; i < projection.length; i++) {
-                List<Integer> columns = mapping[projection[i]];
-                if (columns == null) {
-                    columns = new ArrayList<>();
-                    mapping[projection[i]] = columns;
+            this.mapping = new int[fields.size()];
+            this.mappingBack = new int[schemaFields.size()];
+            Arrays.fill(mappingBack, -1);
+            for (int i = 0; i < mapping.length; i++) {
+                DataField field = fields.get(i);
+                Schema.Field schemaField = schema.getField(field.name());
+                if (schemaField != null) {
+                    int index = schemaFields.indexOf(schemaField);
+                    this.mapping[i] = schemaFields.indexOf(schemaField);

Review Comment:
   use index



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "leaves12138 (via GitHub)" <gi...@apache.org>.
leaves12138 commented on code in PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257#discussion_r1393834446


##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -483,24 +526,27 @@ public RowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
         public InternalRow read(Decoder decoder, Object reuse) throws IOException {
             GenericRow row;
             if (reuse instanceof GenericRow
-                    && ((GenericRow) reuse).getFieldCount() == projection.length) {
+                    && ((GenericRow) reuse).getFieldCount() == mapping.length) {
                 row = (GenericRow) reuse;
             } else {
-                row = new GenericRow(projection.length);
+                row = new GenericRow(mapping.length);
             }
 
+            Object[] values = new Object[fieldReaders.length];
             for (int i = 0; i < fieldReaders.length; i += 1) {
-                int[] columns = mapping[i];
-                FieldReader reader = fieldReaders[i];
-                if (columns == null) {
-                    reader.skip(decoder);
+                if (mappingBack[i] >= 0) {
+                    values[i] = fieldReaders[i].read(decoder, row.getField(mappingBack[i]));
                 } else {
-                    Object value = reader.read(decoder, row.getField(columns[0]));
-                    for (int column : columns) {
-                        row.setField(column, value);
-                    }
+                    fieldReaders[i].skip(decoder);
                 }
             }
+
+            values = VectorMappingUtils.mappingObjects(values, mapping);

Review Comment:
   Fixed this.



##########
paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java:
##########
@@ -432,49 +482,42 @@ public void skip(Decoder decoder) throws IOException {
         }
     }
 
-    public RowReader createRowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
-        return new RowReader(schema, fieldTypes, projection);
+    public RowReader createRowReader(Schema schema, List<DataField> fields) {
+        return new RowReader(schema, fields);
     }
 
     /** A {@link FieldReader} to read {@link InternalRow}. */
     public class RowReader implements FieldReader {
 
         private final FieldReader[] fieldReaders;
-        private final int[] projection;
-        private final int[][] mapping;
-
-        public RowReader(Schema schema, List<DataType> fieldTypes) {
-            this(schema, fieldTypes, IntStream.range(0, fieldTypes.size()).toArray());
-        }
+        private final int[] mapping;
+        private final int[] mappingBack;
 
-        public RowReader(Schema schema, List<DataType> fieldTypes, int[] projection) {
+        public RowReader(Schema schema, List<DataField> fields) {
             List<Schema.Field> schemaFields = schema.getFields();
-            this.fieldReaders = new FieldReader[schemaFields.size()];
-            for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize; i++) {
-                Schema.Field field = schemaFields.get(i);
-                DataType type = fieldTypes.get(i);
-                fieldReaders[i] = visit(field.schema(), type);
-            }
-            this.projection = projection;
-
-            // use fieldTypes to compatible with less fields in avro
-
-            @SuppressWarnings("unchecked")
-            List<Integer>[] mapping = new List[fieldTypes.size()];
-            for (int i = 0; i < projection.length; i++) {
-                List<Integer> columns = mapping[projection[i]];
-                if (columns == null) {
-                    columns = new ArrayList<>();
-                    mapping[projection[i]] = columns;
+            this.mapping = new int[fields.size()];
+            this.mappingBack = new int[schemaFields.size()];
+            Arrays.fill(mappingBack, -1);
+            for (int i = 0; i < mapping.length; i++) {
+                DataField field = fields.get(i);
+                Schema.Field schemaField = schema.getField(field.name());
+                if (schemaField != null) {
+                    int index = schemaFields.indexOf(schemaField);
+                    this.mapping[i] = schemaFields.indexOf(schemaField);

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] index mapping push down to format reader and skip partition value reading from file [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2257:
URL: https://github.com/apache/incubator-paimon/pull/2257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org